BatchDispatcher exception handling
[folly.git] / folly / fibers / BatchDispatcher.h
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <folly/futures/Future.h>
19 #include <folly/futures/Promise.h>
20
21 namespace folly {
22 namespace fibers {
23
24 /**
25  * BatchDispatcher is useful for batching values while doing I/O.
26  * For example, if you are launching multiple tasks which take a
27  * single id and each task fetches from database, you can use BatchDispatcher
28  * to batch those ids and do a single query requesting all those ids.
29  *
30  * To use this, create a BatchDispatcher with a dispatch function
31  * which consumes a vector of values and returns a vector of results
32  * in the same order. Add values to BatchDispatcher using add function,
33  * which returns a future to the result set in your dispatch function.
34  *
35  * Implementation Logic:
36  *  - using FiberManager as executor example, user creates a
37  *    thread_local BatchDispatcher, on which user calls add(value).
38  *  - add(value) adds the value in a vector and also schedules a new
39  *    task(BatchDispatchFunction) which will read the vector of values and call
40  *    user's DispatchFunction() on it.
41  *  - assuming the executor queues all the task and runs them in order of their
42  *    creation time, then BatchDispatcher will run later than all the tasks
43  *    already created. Depending on this, all the values were added in these
44  *    tasks would be picked up by BatchDispatchFunction()
45  *
46  * Example:
47  *  - User schedules Task1, Task2, Task3 each of them calls BatchDispatch.add()
48  *    with id1, id2, id3 respectively.
49  *  - Executor's state {Task1, Task2, Task3}, BatchDispatchers state {}
50  *  - After Task1 calls BatchDispatcher.add():
51  *    Executor's state {Task2, Task3, BatchDispatchFunction},
52  *    BatchDispatcher's state {id1}
53  *  - After Task2 calls BatchDispatcher.add():
54  *    Executor's state {Task3, BatchDispatchFunction},
55  *    BatchDispatcher's state {id1, id2}
56  *  - After Task3 calls BatchDispatcher.add():
57  *    Executor's state {BatchDispatchFunction},
58  *    BatchDispatcher's state {id1, id2, id3}
59  *  - Now BatchDispatcher calls user's Dispatch function with {id1, id2, id3}
60  *
61  * Note:
62  *  - This only works with executors which runs
63  *    the tasks in order of their schedule time.
64  *  - BatchDispatcher is not thread safe.
65  */
66 template <typename ValueT, typename ResultT, typename ExecutorT>
67 class BatchDispatcher {
68  public:
69   using ValueBatchT = std::vector<ValueT>;
70   using ResultBatchT = std::vector<ResultT>;
71   using PromiseBatchT = std::vector<folly::Promise<ResultT>>;
72   using DispatchFunctionT = folly::Function<ResultBatchT(ValueBatchT&&)>;
73
74   BatchDispatcher(ExecutorT& executor, DispatchFunctionT dispatchFunc)
75       : executor_(executor),
76         state_(new DispatchState(std::move(dispatchFunc))) {}
77
78   Future<ResultT> add(ValueT value) {
79     if (state_->values.empty()) {
80       executor_.add([state = state_]() { dispatchFunctionWrapper(*state); });
81     }
82
83     folly::Promise<ResultT> resultPromise;
84     auto resultFuture = resultPromise.getFuture();
85
86     state_->values.emplace_back(std::move(value));
87     state_->promises.emplace_back(std::move(resultPromise));
88
89     return resultFuture;
90   }
91
92  private:
93   struct DispatchState {
94     explicit DispatchState(DispatchFunctionT&& dispatchFunction)
95         : dispatchFunc(std::move(dispatchFunction)) {}
96
97     DispatchFunctionT dispatchFunc;
98     ValueBatchT values;
99     PromiseBatchT promises;
100   };
101
102   static void dispatchFunctionWrapper(DispatchState& state) {
103     ValueBatchT values;
104     PromiseBatchT promises;
105     state.values.swap(values);
106     state.promises.swap(promises);
107
108     try {
109       auto results = state.dispatchFunc(std::move(values));
110       if (results.size() != promises.size()) {
111         throw std::logic_error(
112             "Unexpected number of results returned from dispatch function");
113       }
114
115       for (size_t i = 0; i < promises.size(); i++) {
116         promises[i].setValue(std::move(results[i]));
117       }
118     } catch (const std::exception& ex) {
119       for (size_t i = 0; i < promises.size(); i++) {
120         promises[i].setException(
121             exception_wrapper(std::current_exception(), ex));
122       }
123     } catch (...) {
124       for (size_t i = 0; i < promises.size(); i++) {
125         promises[i].setException(exception_wrapper(std::current_exception()));
126       }
127     }
128   }
129
130   ExecutorT& executor_;
131   std::shared_ptr<DispatchState> state_;
132 };
133 }
134 }