2017
[folly.git] / folly / fibers / AtomicBatchDispatcher.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <folly/Function.h>
19 #include <folly/Optional.h>
20 #include <folly/futures/Future.h>
21 #include <folly/futures/Promise.h>
22 #include <memory>
23 #include <utility>
24 #include <vector>
25
26 namespace folly {
27 namespace fibers {
28
29 /**
30  * AtomicBatchDispatcher should be used if you want to process fiber tasks in
31  * parallel, but require to synchronize them at some point. The canonical
32  * example is to create a database transaction dispatch round. This API notably
33  * enforces that all tasks in the batch have reached the synchronization point
34  * before the user provided dispatch function is called with all the inputs
35  * provided in one function call. It also provides a guarantee that the inputs
36  * in the vector of inputs passed to the user provided dispatch function will be
37  * in the same order as the order in which the token for the job was issued.
38  *
39  * Use this when you want all the inputs in the batch to be processed by a
40  * single function call to the user provided dispatch function.
41  * The user provided dispatch function takes a vector of InputT as input and
42  * returns a vector of ResultT.
43  * To use an AtomicBatchDispatcher, create it by providing a dispatch function:
44  * TO EITHER the constructor of the AtomicBatchDispatcher class
45  * (can call reserve method on the dispatcher to reserve space (for number of
46  *  inputs expected)),
47  * OR the createAtomicBatchDispatcher function in folly::fibers namespace
48  *    (optionally specify an initial capacity (for number of inputs expected)).
49  * The AtomicBatchDispatcher object created using this call (dispatcher),
50  * is the only object that can issue tokens (Token objects) that are used to
51  * add an input to the batch. A single Token is issued when the user calls
52  * the getToken function on the dispatcher.
53  * Token objects cannot be copied (can only be moved). User can call the public
54  * dispatch function on the Token providing a single input value. The dispatch
55  * function returns a folly::Future<ResultT> value that the user can then wait
56  * on to obtain a ResultT value. The ResultT value will only be available once
57  * the dispatch function has been called on all the Tokens in the batch and the
58  * user has called dispatcher.commit() to indicate no more batched transactions
59  * are to be added.
60  * User code pertaining to a task can be run between the point where a token for
61  * the task has been issued and before calling the dispatch function on the
62  * token. Since this code can potentially throw, the token issued for a task
63  * should be moved into this processing code in such a way that if an exception
64  * is thrown and then handled, the token object for the task is destroyed.
65  * The batch query dispatcher will wait until all tokens have either been
66  * destroyed or have had the dispatch function called on them. Leaking an
67  * issued token will cause the batch dispatch to wait forever to happen.
68  *
69  * The AtomicBatchDispatcher object is referred to as the dispatcher below.
70  *
71  * POSSIBLE ERRORS:
72  * 1) The dispatcher is destroyed before calling commit on it, for example
73  *    because the user forgot to call commit OR an exception was thrown
74  *    in user code before the call to commit:
75  *    - The future ResultT has an exception of type std::logic_error set for all
76  *      tokens that were issued by the dispatcher (once all tokens are either
77  *      destroyed or have called dispatch)
78  * 2) Calling the dispatch function more than once on the same Token object
79  *    (or a moved version of the same Token):
80  *    - Subsequent calls to dispatch (after the first one) will throw an
81  *      std::logic_error exception (the batch itself will not have any errors
82  *      and will get processed)
83  * 3) One/more of the Tokens issued are destroyed before calling dispatch on
84  *    it/them:
85  *    - The future ResultT has an exception of type std::logic_error set for all
86  *      tokens that were issued by the dispatcher (once all tokens are either
87  *      destroyed or have called dispatch)
88  * 4) dispatcher.getToken() is called after calling dispatcher.commit()
89  *    - the call to getToken() will throw an std::logic_error exception
90  *      (the batch itself will not have any errors and will get processed).
91  * 5) All tokens were issued and called dispatch, the user provided batch
92  *    dispatch function is called, but that function throws any exception.
93  *    - The future ResultT has exception for all tokens that were issued by
94  *      the dispatcher. The result will contain the wrapped user exception.
95  *
96  * EXAMPLE (There are other ways to achieve this, but this is one example):
97  * - User creates an AtomicBatchDispatcher on stack
98  *     auto dispatcher =
99  *         folly::fibers::createAtomicBatchDispatcher(dispatchFunc, count);
100  * - User creates "count" number of token objects by calling "getToken" count
101  *   number of times
102  *     std::vector<Job> jobs;
103  *     for (size_t i = 0; i < count; ++i) {
104  *       auto token = dispatcher.getToken();
105  *       jobs.push_back(Job(std::move(token), singleInputValueToProcess);
106  *     }
107  * - User calls commit() on the dispatcher to indicate that no new tokens will
108  *   be issued for this batch
109  *     dispatcher.commit();
110  * - Use any single threaded executor that will process the jobs
111  * - On each execution (fiber) preprocess a single "Job" that has been moved in
112  *   from the original vector "jobs". This way if the preprocessing throws
113  *   the Job object being processed is destroyed and so is the token.
114  * - On each execution (fiber) call the dispatch on the token
115  *     auto future = job.token.dispatch(job.input);
116  * - Save the future returned so that eventually you can wait on the results
117  *     ResultT result;
118  *     try {
119  *       result = future.value();
120  *       // future.hasValue() is true
121  *     } catch (...) {
122  *       // future.hasException() is true
123  *       <DO WHATEVER YOU WANT IN CASE OF ERROR> }
124  *     }
125  *
126  * NOTES:
127  * - AtomicBatchDispatcher is not thread safe.
128  * - Works for executors that run tasks on a single thread.
129  */
130 template <typename InputT, typename ResultT>
131 class AtomicBatchDispatcher {
132  private:
133   struct DispatchBaton;
134   friend struct DispatchBaton;
135
136  public:
137   using DispatchFunctionT =
138       folly::Function<std::vector<ResultT>(std::vector<InputT>&&)>;
139
140   class Token {
141    public:
142     explicit Token(std::shared_ptr<DispatchBaton> baton, size_t sequenceNumber);
143
144     Future<ResultT> dispatch(InputT input);
145
146     // Allow moving a Token object
147     Token(Token&&) = default;
148     Token& operator=(Token&&) = default;
149
150     size_t sequenceNumber() const;
151
152    private:
153     // Disallow copying a Token object
154     Token(const Token&) = delete;
155     Token& operator=(const Token&) = delete;
156
157     std::shared_ptr<DispatchBaton> baton_;
158     size_t sequenceNumber_;
159   };
160
161   explicit AtomicBatchDispatcher(DispatchFunctionT&& dispatchFunc);
162
163   ~AtomicBatchDispatcher();
164
165   // numEntries is a *hint* about the number of inputs to expect:
166   // - It is used purely to reserve space for storing vector of inputs etc.,
167   //   so that reeallocation and move copy are reduced / not needed.
168   // - It is provided purely for performance reasons
169   void reserve(size_t numEntries);
170
171   Token getToken();
172
173   void commit();
174
175   // Allow moving an AtomicBatchDispatcher object
176   AtomicBatchDispatcher(AtomicBatchDispatcher&&) = default;
177   AtomicBatchDispatcher& operator=(AtomicBatchDispatcher&&) = default;
178
179  private:
180   // Disallow copying an AtomicBatchDispatcher object
181   AtomicBatchDispatcher(const AtomicBatchDispatcher&) = delete;
182   AtomicBatchDispatcher& operator=(const AtomicBatchDispatcher&) = delete;
183
184   size_t numTokensIssued_;
185   std::shared_ptr<DispatchBaton> baton_;
186 };
187
188 // initialCapacity is a *hint* about the number of inputs to expect:
189 // - It is used purely to reserve space for storing vector of inputs etc.,
190 //   so that reeallocation and move copy are reduced / not needed.
191 // - It is provided purely for performance reasons
192 template <typename InputT, typename ResultT>
193 AtomicBatchDispatcher<InputT, ResultT> createAtomicBatchDispatcher(
194     folly::Function<std::vector<ResultT>(std::vector<InputT>&&)> dispatchFunc,
195     size_t initialCapacity = 0);
196
197 } // namespace fibers
198 } // namespace folly
199
200 #include <folly/fibers/AtomicBatchDispatcher-inl.h>