give all folly exception types default visibility
[folly.git] / folly / fibers / AtomicBatchDispatcher.h
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <memory>
19 #include <stdexcept>
20 #include <string>
21 #include <utility>
22 #include <vector>
23
24 #include <folly/CPortability.h>
25 #include <folly/Function.h>
26 #include <folly/Optional.h>
27 #include <folly/fibers/detail/AtomicBatchDispatcher.h>
28 #include <folly/futures/Future.h>
29 #include <folly/futures/Promise.h>
30
31 namespace folly {
32 namespace fibers {
33
34 /**
35  * An exception class that gets thrown when the AtomicBatchDispatcher is used
36  * incorrectly. This is indicative of a bug in the user code.
37  * Examples are, multiple dispatch calls on the same token, trying to get more
38  * tokens from the dispatcher after commit has been called, etc.
39  */
40 class FOLLY_EXPORT ABDUsageException : public std::logic_error {
41   using std::logic_error::logic_error;
42 };
43
44 /**
45  * An exception class that gets set on the promise for dispatched tokens, when
46  * the AtomicBatchDispatcher was destroyed before commit was called on it.
47  */
48 class FOLLY_EXPORT ABDCommitNotCalledException : public std::runtime_error {
49  public:
50   ABDCommitNotCalledException()
51       : std::runtime_error(
52             "AtomicBatchDispatcher destroyed before commit() was called") {}
53 };
54
55 /**
56  * An exception class that gets set on the promise for dispatched tokens, when
57  * one or more other tokens in the batch were destroyed before dispatch was
58  * called on them.
59  * Only here so that the caller can distinguish the real failure cause
60  * rather than these subsequently thrown exceptions.
61  */
62 class FOLLY_EXPORT ABDTokenNotDispatchedException : public std::runtime_error {
63   using std::runtime_error::runtime_error;
64 };
65
66 /**
67  * AtomicBatchDispatcher should be used if you want to process fiber tasks in
68  * parallel, but require to synchronize them at some point. The canonical
69  * example is to create a database transaction dispatch round. This API notably
70  * enforces that all tasks in the batch have reached the synchronization point
71  * before the user provided dispatch function is called with all the inputs
72  * provided in one function call. It also provides a guarantee that the inputs
73  * in the vector of inputs passed to the user provided dispatch function will be
74  * in the same order as the order in which the token for the job was issued.
75  *
76  * Use this when you want all the inputs in the batch to be processed by a
77  * single function call to the user provided dispatch function.
78  * The user provided dispatch function takes a vector of InputT as input and
79  * returns a vector of ResultT.
80  * To use an AtomicBatchDispatcher, create it by providing a dispatch function:
81  * TO EITHER the constructor of the AtomicBatchDispatcher class
82  * (can call reserve method on the dispatcher to reserve space (for number of
83  *  inputs expected)),
84  * OR the createAtomicBatchDispatcher function in folly::fibers namespace
85  *    (optionally specify an initial capacity (for number of inputs expected)).
86  * The AtomicBatchDispatcher object created using this call (dispatcher),
87  * is the only object that can issue tokens (Token objects) that are used to
88  * add an input to the batch. A single Token is issued when the user calls
89  * the getToken function on the dispatcher.
90  * Token objects cannot be copied (can only be moved). User can call the public
91  * dispatch function on the Token providing a single input value. The dispatch
92  * function returns a folly::Future<ResultT> value that the user can then wait
93  * on to obtain a ResultT value. The ResultT value will only be available once
94  * the dispatch function has been called on all the Tokens in the batch and the
95  * user has called dispatcher.commit() to indicate no more batched transactions
96  * are to be added.
97  * User code pertaining to a task can be run between the point where a token for
98  * the task has been issued and before calling the dispatch function on the
99  * token. Since this code can potentially throw, the token issued for a task
100  * should be moved into this processing code in such a way that if an exception
101  * is thrown and then handled, the token object for the task is destroyed.
102  * The batch query dispatcher will wait until all tokens have either been
103  * destroyed or have had the dispatch function called on them. Leaking an
104  * issued token will cause the batch dispatch to wait forever to happen.
105  *
106  * The AtomicBatchDispatcher object is referred to as the dispatcher below.
107  *
108  * POSSIBLE ERRORS:
109  * 1) The dispatcher is destroyed before calling commit on it, for example
110  *    because the user forgot to call commit OR an exception was thrown
111  *    in user code before the call to commit:
112  *    - The future ResultT has an exception of type ABDCommitNotCalledException
113  *      set for all tokens that were issued by the dispatcher (once all tokens
114  *      are either destroyed or have called dispatch)
115  * 2) Calling the dispatch function more than once on the same Token object
116  *    (or a moved version of the same Token):
117  *    - Subsequent calls to dispatch (after the first one) will throw an
118  *      ABDUsageException exception (the batch itself will not have any errors
119  *      and will get processed)
120  * 3) One/more of the Tokens issued are destroyed before calling dispatch on
121  *    it/them:
122  *    - The future ResultT has an ABDTokenNotDispatchedException set for all
123  *      tokens that were issued by the dispatcher (once all tokens are either
124  *      destroyed or have called dispatch)
125  * 4) dispatcher.getToken() is called after calling dispatcher.commit()
126  *    - the call to getToken() will throw an ABDUsageException exception
127  *      (the batch itself will not have any errors and will get processed).
128  * 5) All tokens were issued and called dispatch, the user provided batch
129  *    dispatch function is called, but that function throws any exception.
130  *    - The future ResultT has exception for all tokens that were issued by
131  *      the dispatcher. The result will contain the wrapped user exception.
132  *
133  * EXAMPLE (There are other ways to achieve this, but this is one example):
134  * - User creates an AtomicBatchDispatcher on stack
135  *     auto dispatcher =
136  *         folly::fibers::createAtomicBatchDispatcher(dispatchFunc, count);
137  * - User creates "count" number of token objects by calling "getToken" count
138  *   number of times
139  *     std::vector<Job> jobs;
140  *     for (size_t i = 0; i < count; ++i) {
141  *       auto token = dispatcher.getToken();
142  *       jobs.push_back(Job(std::move(token), singleInputValueToProcess);
143  *     }
144  * - User calls commit() on the dispatcher to indicate that no new tokens will
145  *   be issued for this batch
146  *     dispatcher.commit();
147  * - Use any single threaded executor that will process the jobs
148  * - On each execution (fiber) preprocess a single "Job" that has been moved in
149  *   from the original vector "jobs". This way if the preprocessing throws
150  *   the Job object being processed is destroyed and so is the token.
151  * - On each execution (fiber) call the dispatch on the token
152  *     auto future = job.token.dispatch(job.input);
153  * - Save the future returned so that eventually you can wait on the results
154  *     ResultT result;
155  *     try {
156  *       result = future.value();
157  *       // future.hasValue() is true
158  *     } catch (...) {
159  *       // future.hasException() is true
160  *       <DO WHATEVER YOU WANT IN CASE OF ERROR> }
161  *     }
162  *
163  * NOTES:
164  * - AtomicBatchDispatcher is not thread safe.
165  * - Works for executors that run tasks on a single thread.
166  */
167 template <typename InputT, typename ResultT>
168 class AtomicBatchDispatcher {
169  private:
170   struct DispatchBaton;
171   friend struct DispatchBaton;
172
173  public:
174   using DispatchFunctionT =
175       folly::Function<std::vector<ResultT>(std::vector<InputT>&&)>;
176
177   class Token {
178    public:
179     explicit Token(std::shared_ptr<DispatchBaton> baton, size_t sequenceNumber);
180
181     Future<ResultT> dispatch(InputT input);
182
183     // Allow moving a Token object
184     Token(Token&&) = default;
185     Token& operator=(Token&&) = default;
186
187     size_t sequenceNumber() const;
188
189    private:
190     // Disallow copying a Token object
191     Token(const Token&) = delete;
192     Token& operator=(const Token&) = delete;
193
194     std::shared_ptr<DispatchBaton> baton_;
195     size_t sequenceNumber_;
196   };
197
198   explicit AtomicBatchDispatcher(DispatchFunctionT&& dispatchFunc);
199
200   ~AtomicBatchDispatcher();
201
202   // numEntries is a *hint* about the number of inputs to expect:
203   // - It is used purely to reserve space for storing vector of inputs etc.,
204   //   so that reeallocation and move copy are reduced / not needed.
205   // - It is provided purely for performance reasons
206   void reserve(size_t numEntries);
207
208   Token getToken();
209
210   void commit();
211
212   // Allow moving an AtomicBatchDispatcher object
213   AtomicBatchDispatcher(AtomicBatchDispatcher&&) = default;
214   AtomicBatchDispatcher& operator=(AtomicBatchDispatcher&&) = default;
215
216  private:
217   // Disallow copying an AtomicBatchDispatcher object
218   AtomicBatchDispatcher(const AtomicBatchDispatcher&) = delete;
219   AtomicBatchDispatcher& operator=(const AtomicBatchDispatcher&) = delete;
220
221   size_t numTokensIssued_;
222   std::shared_ptr<DispatchBaton> baton_;
223 };
224
225 // initialCapacity is a *hint* about the number of inputs to expect:
226 // - It is used purely to reserve space for storing vector of inputs etc.,
227 //   so that reeallocation and move copy are reduced / not needed.
228 // - It is provided purely for performance reasons
229 template <typename InputT, typename ResultT>
230 AtomicBatchDispatcher<InputT, ResultT> createAtomicBatchDispatcher(
231     folly::Function<std::vector<ResultT>(std::vector<InputT>&&)> dispatchFunc,
232     size_t initialCapacity = 0);
233
234 } // namespace fibers
235 } // namespace folly
236
237 #include <folly/fibers/AtomicBatchDispatcher-inl.h>