dd2149cb1c893e8fde6417e33340c27f8fb5789b
[folly.git] / folly / fibers / BatchDispatcher.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <folly/Function.h>
19 #include <folly/futures/Future.h>
20 #include <folly/futures/Promise.h>
21 #include <exception>
22 #include <memory>
23 #include <stdexcept>
24 #include <vector>
25
26 namespace folly {
27 namespace fibers {
28
29 /**
30  * BatchDispatcher is useful for batching values while doing I/O.
31  * For example, if you are launching multiple tasks which take a
32  * single id and each task fetches from database, you can use BatchDispatcher
33  * to batch those ids and do a single query requesting all those ids.
34  *
35  * To use this, create a BatchDispatcher with a dispatch function
36  * which consumes a vector of values and returns a vector of results
37  * in the same order. Add values to BatchDispatcher using add function,
38  * which returns a future to the result set in your dispatch function.
39  *
40  * Implementation Logic:
41  *  - using FiberManager as executor example, user creates a
42  *    thread_local BatchDispatcher, on which user calls add(value).
43  *  - add(value) adds the value in a vector and also schedules a new
44  *    task(BatchDispatchFunction) which will read the vector of values and call
45  *    user's DispatchFunction() on it.
46  *  - assuming the executor queues all the task and runs them in order of their
47  *    creation time, then BatchDispatcher will run later than all the tasks
48  *    already created. Depending on this, all the values were added in these
49  *    tasks would be picked up by BatchDispatchFunction()
50  *
51  * Example:
52  *  - User schedules Task1, Task2, Task3 each of them calls BatchDispatch.add()
53  *    with id1, id2, id3 respectively.
54  *  - Executor's state {Task1, Task2, Task3}, BatchDispatchers state {}
55  *  - After Task1 calls BatchDispatcher.add():
56  *    Executor's state {Task2, Task3, BatchDispatchFunction},
57  *    BatchDispatcher's state {id1}
58  *  - After Task2 calls BatchDispatcher.add():
59  *    Executor's state {Task3, BatchDispatchFunction},
60  *    BatchDispatcher's state {id1, id2}
61  *  - After Task3 calls BatchDispatcher.add():
62  *    Executor's state {BatchDispatchFunction},
63  *    BatchDispatcher's state {id1, id2, id3}
64  *  - Now BatchDispatcher calls user's Dispatch function with {id1, id2, id3}
65  *
66  * Note:
67  *  - This only works with executors which runs
68  *    the tasks in order of their schedule time.
69  *  - BatchDispatcher is not thread safe.
70  */
71 template <typename ValueT, typename ResultT, typename ExecutorT>
72 class BatchDispatcher {
73  public:
74   using ValueBatchT = std::vector<ValueT>;
75   using ResultBatchT = std::vector<ResultT>;
76   using PromiseBatchT = std::vector<folly::Promise<ResultT>>;
77   using DispatchFunctionT = folly::Function<ResultBatchT(ValueBatchT&&)>;
78
79   BatchDispatcher(ExecutorT& executor, DispatchFunctionT dispatchFunc)
80       : executor_(executor),
81         state_(new DispatchState(std::move(dispatchFunc))) {}
82
83   Future<ResultT> add(ValueT value) {
84     if (state_->values.empty()) {
85       executor_.add([state = state_]() { dispatchFunctionWrapper(*state); });
86     }
87
88     folly::Promise<ResultT> resultPromise;
89     auto resultFuture = resultPromise.getFuture();
90
91     state_->values.emplace_back(std::move(value));
92     state_->promises.emplace_back(std::move(resultPromise));
93
94     return resultFuture;
95   }
96
97  private:
98   struct DispatchState {
99     explicit DispatchState(DispatchFunctionT&& dispatchFunction)
100         : dispatchFunc(std::move(dispatchFunction)) {}
101
102     DispatchFunctionT dispatchFunc;
103     ValueBatchT values;
104     PromiseBatchT promises;
105   };
106
107   static void dispatchFunctionWrapper(DispatchState& state) {
108     ValueBatchT values;
109     PromiseBatchT promises;
110     state.values.swap(values);
111     state.promises.swap(promises);
112
113     try {
114       auto results = state.dispatchFunc(std::move(values));
115       if (results.size() != promises.size()) {
116         throw std::logic_error(
117             "Unexpected number of results returned from dispatch function");
118       }
119
120       for (size_t i = 0; i < promises.size(); i++) {
121         promises[i].setValue(std::move(results[i]));
122       }
123     } catch (const std::exception& ex) {
124       for (size_t i = 0; i < promises.size(); i++) {
125         promises[i].setException(
126             exception_wrapper(std::current_exception(), ex));
127       }
128     } catch (...) {
129       for (size_t i = 0; i < promises.size(); i++) {
130         promises[i].setException(exception_wrapper(std::current_exception()));
131       }
132     }
133   }
134
135   ExecutorT& executor_;
136   std::shared_ptr<DispatchState> state_;
137 };
138 }
139 }