2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include <folly/Function.h>
19 #include <folly/futures/Future.h>
20 #include <folly/futures/Promise.h>
30 * BatchDispatcher is useful for batching values while doing I/O.
31 * For example, if you are launching multiple tasks which take a
32 * single id and each task fetches from database, you can use BatchDispatcher
33 * to batch those ids and do a single query requesting all those ids.
35 * To use this, create a BatchDispatcher with a dispatch function
36 * which consumes a vector of values and returns a vector of results
37 * in the same order. Add values to BatchDispatcher using add function,
38 * which returns a future to the result set in your dispatch function.
40 * Implementation Logic:
41 * - using FiberManager as executor example, user creates a
42 * thread_local BatchDispatcher, on which user calls add(value).
43 * - add(value) adds the value in a vector and also schedules a new
44 * task(BatchDispatchFunction) which will read the vector of values and call
45 * user's DispatchFunction() on it.
46 * - assuming the executor queues all the task and runs them in order of their
47 * creation time, then BatchDispatcher will run later than all the tasks
48 * already created. Depending on this, all the values were added in these
49 * tasks would be picked up by BatchDispatchFunction()
52 * - User schedules Task1, Task2, Task3 each of them calls BatchDispatch.add()
53 * with id1, id2, id3 respectively.
54 * - Executor's state {Task1, Task2, Task3}, BatchDispatchers state {}
55 * - After Task1 calls BatchDispatcher.add():
56 * Executor's state {Task2, Task3, BatchDispatchFunction},
57 * BatchDispatcher's state {id1}
58 * - After Task2 calls BatchDispatcher.add():
59 * Executor's state {Task3, BatchDispatchFunction},
60 * BatchDispatcher's state {id1, id2}
61 * - After Task3 calls BatchDispatcher.add():
62 * Executor's state {BatchDispatchFunction},
63 * BatchDispatcher's state {id1, id2, id3}
64 * - Now BatchDispatcher calls user's Dispatch function with {id1, id2, id3}
67 * - This only works with executors which runs
68 * the tasks in order of their schedule time.
69 * - BatchDispatcher is not thread safe.
71 template <typename ValueT, typename ResultT, typename ExecutorT>
72 class BatchDispatcher {
74 using ValueBatchT = std::vector<ValueT>;
75 using ResultBatchT = std::vector<ResultT>;
76 using PromiseBatchT = std::vector<folly::Promise<ResultT>>;
77 using DispatchFunctionT = folly::Function<ResultBatchT(ValueBatchT&&)>;
79 BatchDispatcher(ExecutorT& executor, DispatchFunctionT dispatchFunc)
80 : executor_(executor),
81 state_(new DispatchState(std::move(dispatchFunc))) {}
83 Future<ResultT> add(ValueT value) {
84 if (state_->values.empty()) {
85 executor_.add([state = state_]() { dispatchFunctionWrapper(*state); });
88 folly::Promise<ResultT> resultPromise;
89 auto resultFuture = resultPromise.getFuture();
91 state_->values.emplace_back(std::move(value));
92 state_->promises.emplace_back(std::move(resultPromise));
98 struct DispatchState {
99 explicit DispatchState(DispatchFunctionT&& dispatchFunction)
100 : dispatchFunc(std::move(dispatchFunction)) {}
102 DispatchFunctionT dispatchFunc;
104 PromiseBatchT promises;
107 static void dispatchFunctionWrapper(DispatchState& state) {
109 PromiseBatchT promises;
110 state.values.swap(values);
111 state.promises.swap(promises);
114 auto results = state.dispatchFunc(std::move(values));
115 if (results.size() != promises.size()) {
116 throw std::logic_error(
117 "Unexpected number of results returned from dispatch function");
120 for (size_t i = 0; i < promises.size(); i++) {
121 promises[i].setValue(std::move(results[i]));
123 } catch (const std::exception& ex) {
124 for (size_t i = 0; i < promises.size(); i++) {
125 promises[i].setException(
126 exception_wrapper(std::current_exception(), ex));
129 for (size_t i = 0; i < promises.size(); i++) {
130 promises[i].setException(exception_wrapper(std::current_exception()));
135 ExecutorT& executor_;
136 std::shared_ptr<DispatchState> state_;
138 } // namespace fibers