2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include <folly/wangle/detail/Core.h>
23 #include <folly/Baton.h>
25 namespace folly { namespace wangle {
29 static const bool value = false;
33 struct isFuture<Future<T> > {
34 static const bool value = true;
38 Future<T>::Future(Future<T>&& other) noexcept : core_(nullptr) {
39 *this = std::move(other);
43 Future<T>& Future<T>::operator=(Future<T>&& other) {
44 std::swap(core_, other.core_);
49 Future<T>::~Future() {
54 void Future<T>::detach() {
56 core_->detachFuture();
62 void Future<T>::throwIfInvalid() const {
69 void Future<T>::setCallback_(F&& func) {
71 core_->setCallback(std::move(func));
76 typename std::enable_if<
77 !isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
78 Future<typename std::result_of<F(Try<T>&&)>::type> >::type
79 Future<T>::then(F&& func) {
80 typedef typename std::result_of<F(Try<T>&&)>::type B;
84 // wrap these so we can move them into the lambda
85 folly::MoveWrapper<Promise<B>> p;
86 folly::MoveWrapper<F> funcm(std::forward<F>(func));
88 // grab the Future now before we lose our handle on the Promise
89 auto f = p->getFuture();
91 /* This is a bit tricky.
93 We can't just close over *this in case this Future gets moved. So we
94 make a new dummy Future. We could figure out something more
95 sophisticated that avoids making a new Future object when it can, as an
96 optimization. But this is correct.
98 core_ can't be moved, it is explicitly disallowed (as is copying). But
99 if there's ever a reason to allow it, this is one place that makes that
100 assumption and would need to be fixed. We use a standard shared pointer
101 for core_ (by copying it in), which means in essence obj holds a shared
102 pointer to itself. But this shouldn't leak because Promise will not
103 outlive the continuation, because Promise will setException() with a
104 broken Promise if it is destructed before completed. We could use a
105 weak pointer but it would have to be converted to a shared pointer when
106 func is executed (because the Future returned by func may possibly
107 persist beyond the callback, if it gets moved), and so it is an
108 optimization to just make it shared from the get-go.
110 We have to move in the Promise and func using the MoveWrapper
111 hack. (func could be copied but it's a big drag on perf).
113 Two subtle but important points about this design. detail::Core has no
114 back pointers to Future or Promise, so if Future or Promise get moved
115 (and they will be moved in performant code) we don't have to do
116 anything fancy. And because we store the continuation in the
117 detail::Core, not in the Future, we can execute the continuation even
118 after the Future has gone out of scope. This is an intentional design
119 decision. It is likely we will want to be able to cancel a continuation
120 in some circumstances, but I think it should be explicit not implicit
121 in the destruction of the Future used to create it.
124 [p, funcm](Try<T>&& t) mutable {
126 return (*funcm)(std::move(t));
135 typename std::enable_if<
136 isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
137 Future<typename std::result_of<F(Try<T>&&)>::type::value_type> >::type
138 Future<T>::then(F&& func) {
139 typedef typename std::result_of<F(Try<T>&&)>::type::value_type B;
143 // wrap these so we can move them into the lambda
144 folly::MoveWrapper<Promise<B>> p;
145 folly::MoveWrapper<F> funcm(std::forward<F>(func));
147 // grab the Future now before we lose our handle on the Promise
148 auto f = p->getFuture();
151 [p, funcm](Try<T>&& t) mutable {
153 auto f2 = (*funcm)(std::move(t));
154 // that didn't throw, now we can steal p
155 f2.setCallback_([p](Try<B>&& b) mutable {
156 p->fulfilTry(std::move(b));
159 p->setException(std::current_exception());
167 Future<void> Future<T>::then() {
168 return then([] (Try<T>&& t) {});
172 typename std::add_lvalue_reference<T>::type Future<T>::value() {
175 return core_->value();
179 typename std::add_lvalue_reference<const T>::type Future<T>::value() const {
182 return core_->value();
186 Try<T>& Future<T>::getTry() {
189 return core_->getTry();
193 template <typename Executor>
194 inline Future<T> Future<T>::via(Executor* executor) {
198 core_->setExecutor(executor);
200 return std::move(*this);
204 bool Future<T>::isReady() const {
206 return core_->ready();
212 Future<typename std::decay<T>::type> makeFuture(T&& t) {
213 Promise<typename std::decay<T>::type> p;
214 auto f = p.getFuture();
215 p.setValue(std::forward<T>(t));
219 inline // for multiple translation units
220 Future<void> makeFuture() {
222 auto f = p.getFuture();
230 typename std::enable_if<!std::is_reference<F>::value, bool>::type sdf)
231 -> Future<decltype(func())> {
232 Promise<decltype(func())> p;
233 auto f = p.getFuture();
242 auto makeFutureTry(F const& func) -> Future<decltype(func())> {
244 return makeFutureTry(std::move(copy));
248 Future<T> makeFuture(std::exception_ptr const& e) {
250 auto f = p.getFuture();
255 template <class T, class E>
256 typename std::enable_if<std::is_base_of<std::exception, E>::value,
258 makeFuture(E const& e) {
260 auto f = p.getFuture();
261 p.fulfil([&]() -> T { throw e; });
266 Future<T> makeFuture(Try<T>&& t) {
268 return makeFuture<T>(std::move(t.value()));
270 return makeFuture<T>(std::current_exception());
275 inline Future<void> makeFuture(Try<void>&& t) {
280 return makeFuture<void>(std::current_exception());
286 template <typename... Fs>
287 typename detail::VariadicContext<
288 typename std::decay<Fs>::type::value_type...>::type
292 new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
293 ctx->total = sizeof...(fs);
294 auto f_saved = ctx->p.getFuture();
295 detail::whenAllVariadicHelper(ctx,
296 std::forward<typename std::decay<Fs>::type>(fs)...);
297 return std::move(f_saved);
302 template <class InputIterator>
305 Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
306 whenAll(InputIterator first, InputIterator last)
309 typename std::iterator_traits<InputIterator>::value_type::value_type T;
311 auto n = std::distance(first, last);
313 return makeFuture(std::vector<Try<T>>());
316 auto ctx = new detail::WhenAllContext<T>();
319 ctx->results.resize(ctx->total);
321 auto f_saved = ctx->p.getFuture();
323 for (size_t i = 0; first != last; ++first, ++i) {
325 f.setCallback_([ctx, i](Try<T>&& t) {
326 ctx->results[i] = std::move(t);
327 if (++ctx->count == ctx->total) {
328 ctx->p.setValue(std::move(ctx->results));
334 return std::move(f_saved);
337 template <class InputIterator>
342 std::iterator_traits<InputIterator>::value_type::value_type> > >
343 whenAny(InputIterator first, InputIterator last) {
345 typename std::iterator_traits<InputIterator>::value_type::value_type T;
347 auto ctx = new detail::WhenAnyContext<T>(std::distance(first, last));
348 auto f_saved = ctx->p.getFuture();
350 for (size_t i = 0; first != last; first++, i++) {
352 f.setCallback_([i, ctx](Try<T>&& t) {
353 if (!ctx->done.exchange(true)) {
354 ctx->p.setValue(std::make_pair(i, std::move(t)));
360 return std::move(f_saved);
363 template <class InputIterator>
364 Future<std::vector<std::pair<size_t, Try<typename
365 std::iterator_traits<InputIterator>::value_type::value_type>>>>
366 whenN(InputIterator first, InputIterator last, size_t n) {
368 std::iterator_traits<InputIterator>::value_type::value_type T;
369 typedef std::vector<std::pair<size_t, Try<T>>> V;
376 auto ctx = std::make_shared<ctx_t>();
379 // for each completed Future, increase count and add to vector, until we
380 // have n completed futures at which point we fulfil our Promise with the
385 it->then([ctx, n, i](Try<T>&& t) {
387 auto c = ++ctx->completed;
389 assert(ctx->v.size() < n);
390 v.push_back(std::make_pair(i, std::move(t)));
392 ctx->p.fulfilTry(Try<V>(std::move(v)));
402 ctx->p.setException(std::runtime_error("Not enough futures"));
405 return ctx->p.getFuture();
408 template <typename T>
410 waitWithSemaphore(Future<T>&& f) {
412 auto done = f.then([&](Try<T> &&t) {
414 return std::move(t.value());
417 while (!done.isReady()) {
418 // There's a race here between the return here and the actual finishing of
419 // the future. f is completed, but the setup may not have finished on done
420 // after the baton has posted.
421 std::this_thread::yield();
427 inline Future<void> waitWithSemaphore<void>(Future<void>&& f) {
429 auto done = f.then([&](Try<void> &&t) {
434 while (!done.isReady()) {
435 // There's a race here between the return here and the actual finishing of
436 // the future. f is completed, but the setup may not have finished on done
437 // after the baton has posted.
438 std::this_thread::yield();
443 template <typename T, class Duration>
445 waitWithSemaphore(Future<T>&& f, Duration timeout) {
446 auto baton = std::make_shared<Baton<>>();
447 auto done = f.then([baton](Try<T> &&t) {
449 return std::move(t.value());
451 baton->timed_wait(std::chrono::system_clock::now() + timeout);
455 template <class Duration>
457 waitWithSemaphore(Future<void>&& f, Duration timeout) {
458 auto baton = std::make_shared<Baton<>>();
459 auto done = f.then([baton](Try<void> &&t) {
463 baton->timed_wait(std::chrono::system_clock::now() + timeout);
469 // I haven't included a Future<T&> specialization because I don't forsee us
470 // using it, however it is not difficult to add when needed. Refer to
471 // Future<void> for guidance. std::future and boost::future code would also be