2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include <folly/futures/Future.h>
19 #include <folly/futures/Promise.h>
25 * BatchDispatcher is useful for batching values while doing I/O.
26 * For example, if you are launching multiple tasks which take a
27 * single id and each task fetches from database, you can use BatchDispatcher
28 * to batch those ids and do a single query requesting all those ids.
30 * To use this, create a BatchDispatcher with a dispatch function
31 * which consumes a vector of values and returns a vector of results
32 * in the same order. Add values to BatchDispatcher using add function,
33 * which returns a future to the result set in your dispatch function.
35 * Implementation Logic:
36 * - using FiberManager as executor example, user creates a
37 * thread_local BatchDispatcher, on which user calls add(value).
38 * - add(value) adds the value in a vector and also schedules a new
39 * task(BatchDispatchFunction) which will read the vector of values and call
40 * user's DispatchFunction() on it.
41 * - assuming the executor queues all the task and runs them in order of their
42 * creation time, then BatchDispatcher will run later than all the tasks
43 * already created. Depending on this, all the values were added in these
44 * tasks would be picked up by BatchDispatchFunction()
47 * - User schedules Task1, Task2, Task3 each of them calls BatchDispatch.add()
48 * with id1, id2, id3 respectively.
49 * - Executor's state {Task1, Task2, Task3}, BatchDispatchers state {}
50 * - After Task1 calls BatchDispatcher.add():
51 * Executor's state {Task2, Task3, BatchDispatchFunction},
52 * BatchDispatcher's state {id1}
53 * - After Task2 calls BatchDispatcher.add():
54 * Executor's state {Task3, BatchDispatchFunction},
55 * BatchDispatcher's state {id1, id2}
56 * - After Task3 calls BatchDispatcher.add():
57 * Executor's state {BatchDispatchFunction},
58 * BatchDispatcher's state {id1, id2, id3}
59 * - Now BatchDispatcher calls user's Dispatch function with {id1, id2, id3}
62 * - This only works with executors which runs
63 * the tasks in order of their schedule time.
64 * - BatchDispatcher is not thread safe.
66 template <typename ValueT, typename ResultT, typename ExecutorT>
67 class BatchDispatcher {
69 using ValueBatchT = std::vector<ValueT>;
70 using ResultBatchT = std::vector<ResultT>;
71 using PromiseBatchT = std::vector<folly::Promise<ResultT>>;
72 using DispatchFunctionT = folly::Function<ResultBatchT(ValueBatchT&&)>;
74 BatchDispatcher(ExecutorT& executor, DispatchFunctionT dispatchFunc)
75 : executor_(executor),
76 state_(new DispatchState(std::move(dispatchFunc))) {}
78 Future<ResultT> add(ValueT value) {
79 if (state_->values.empty()) {
80 executor_.add([state = state_]() { dispatchFunctionWrapper(*state); });
83 folly::Promise<ResultT> resultPromise;
84 auto resultFuture = resultPromise.getFuture();
86 state_->values.emplace_back(std::move(value));
87 state_->promises.emplace_back(std::move(resultPromise));
93 struct DispatchState {
94 explicit DispatchState(DispatchFunctionT&& dispatchFunction)
95 : dispatchFunc(std::move(dispatchFunction)) {}
97 DispatchFunctionT dispatchFunc;
99 PromiseBatchT promises;
102 static void dispatchFunctionWrapper(DispatchState& state) {
104 PromiseBatchT promises;
105 state.values.swap(values);
106 state.promises.swap(promises);
109 auto results = state.dispatchFunc(std::move(values));
110 if (results.size() != promises.size()) {
111 throw std::logic_error(
112 "Unexpected number of results returned from dispatch function");
115 for (size_t i = 0; i < promises.size(); i++) {
116 promises[i].setValue(std::move(results[i]));
118 } catch (const std::exception& ex) {
119 for (size_t i = 0; i < promises.size(); i++) {
120 promises[i].setException(
121 exception_wrapper(std::current_exception(), ex));
124 for (size_t i = 0; i < promises.size(); i++) {
125 promises[i].setException(exception_wrapper(std::current_exception()));
130 ExecutorT& executor_;
131 std::shared_ptr<DispatchState> state_;