X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=blobdiff_plain;f=folly%2Ffutures%2FFuture-inl.h;h=59498af986ef9768a745b502e7968af11e32a555;hp=927a3142b8ed94a056b3d3f8beac50359d039878;hb=d167b41554b659b3c0966210b17f6e038eb11648;hpb=5b531cbb45884ad8a27891cf3170cd0286c3183c diff --git a/folly/futures/Future-inl.h b/folly/futures/Future-inl.h index 927a3142..59498af9 100644 --- a/folly/futures/Future-inl.h +++ b/folly/futures/Future-inl.h @@ -44,24 +44,17 @@ Future& Future::operator=(Future&& other) noexcept { } template -template ::value, void*>::type> -Future::Future(T2&& val) : core_(nullptr) { - Promise p; - p.setValue(std::forward(val)); - *this = p.getFuture(); -} +template +Future::Future(T2&& val) + : core_(new detail::Core(Try(std::forward(val)))) {} template template ::value, int>::type> -Future::Future() : core_(nullptr) { - Promise p; - p.setValue(); - *this = p.getFuture(); -} +Future::Future() + : core_(new detail::Core(Try())) {} template @@ -117,6 +110,7 @@ Future::thenImplementation(F func, detail::argResult) { // wrap these so we can move them into the lambda folly::MoveWrapper> p; + p->setInterruptHandler(core_->getInterruptHandler()); folly::MoveWrapper funcm(std::forward(func)); // grab the Future now before we lose our handle on the Promise @@ -226,18 +220,17 @@ Future::then(R(Caller::*func)(Args...), Caller *instance) { }); } -// TODO(6838553) -#ifndef __clang__ template -template -auto Future::then(Executor* x, Args&&... args) - -> decltype(this->then(std::forward(args)...)) +template +auto Future::then(Executor* x, Arg&& arg, Args&&... args) + -> decltype(this->then(std::forward(arg), + std::forward(args)...)) { auto oldX = getExecutor(); setExecutor(x); - return this->then(std::forward(args)...).via(oldX); + return this->then(std::forward(arg), std::forward(args)...). + via(oldX); } -#endif template Future Future::then() { @@ -424,24 +417,33 @@ Optional> Future::poll() { } template -template -inline Future Future::via(Executor* executor) && { +inline Future Future::via(Executor* executor, int8_t priority) && { throwIfInvalid(); - setExecutor(executor); + setExecutor(executor, priority); return std::move(*this); } template -template -inline Future Future::via(Executor* executor) & { +inline Future Future::via(Executor* executor, int8_t priority) & { throwIfInvalid(); MoveWrapper> p; auto f = p->getFuture(); then([p](Try&& t) mutable { p->setTry(std::move(t)); }); - return std::move(f).via(executor); + return std::move(f).via(executor, priority); +} + + +template +auto via(Executor* x, Func func) + -> Future::Inner> +// this would work, if not for Future :-/ +// -> decltype(via(x).then(func)) +{ + // TODO make this actually more performant. :-P #7260175 + return via(x).then(func); } template @@ -459,16 +461,12 @@ void Future::raise(exception_wrapper exception) { template Future::type> makeFuture(T&& t) { - Promise::type> p; - p.setValue(std::forward(t)); - return p.getFuture(); + return makeFuture(Try::type>(std::forward(t))); } inline // for multiple translation units Future makeFuture() { - Promise p; - p.setValue(); - return p.getFuture(); + return makeFuture(Try()); } template @@ -476,81 +474,69 @@ auto makeFutureWith( F&& func, typename std::enable_if::value, bool>::type sdf) -> Future { - Promise p; - p.setWith( - [&func]() { - return (func)(); - }); - return p.getFuture(); + return makeFuture(makeTryWith([&func]() { + return (func)(); + })); } template auto makeFutureWith(F const& func) -> Future { F copy = func; - return makeFutureWith(std::move(copy)); + return makeFuture(makeTryWith(std::move(copy))); } template Future makeFuture(std::exception_ptr const& e) { - Promise p; - p.setException(e); - return p.getFuture(); + return makeFuture(Try(e)); } template Future makeFuture(exception_wrapper ew) { - Promise p; - p.setException(std::move(ew)); - return p.getFuture(); + return makeFuture(Try(std::move(ew))); } template typename std::enable_if::value, Future>::type makeFuture(E const& e) { - Promise p; - p.setException(make_exception_wrapper(e)); - return p.getFuture(); + return makeFuture(Try(make_exception_wrapper(e))); } template Future makeFuture(Try&& t) { - Promise::type> p; - p.setTry(std::move(t)); - return p.getFuture(); + return Future(new detail::Core(std::move(t))); } -template <> -inline Future makeFuture(Try&& t) { - if (t.hasException()) { - return makeFuture(std::move(t.exception())); - } else { - return makeFuture(); - } +// via +Future via(Executor* executor, int8_t priority) { + return makeFuture().via(executor, priority); } -// via -template -Future via(Executor* executor) { - return makeFuture().via(executor); +// mapSetCallback calls func(i, Try) when every future completes + +template +void mapSetCallback(InputIterator first, InputIterator last, F func) { + for (size_t i = 0; first != last; ++first, ++i) { + first->setCallback_([func, i](Try&& t) { + func(i, std::move(t)); + }); + } } -// when (variadic) +// collectAll (variadic) template -typename detail::VariadicContext< +typename detail::CollectAllVariadicContext< typename std::decay::type::value_type...>::type collectAll(Fs&&... fs) { - auto ctx = - new detail::VariadicContext::type::value_type...>(); - ctx->total = sizeof...(fs); - auto f_saved = ctx->p.getFuture(); - detail::collectAllVariadicHelper(ctx, - std::forward::type>(fs)...); - return f_saved; + auto ctx = std::make_shared::type::value_type...>>(); + detail::collectVariadicHelper( + ctx, std::forward::type>(fs)...); + return ctx->p.getFuture(); } -// when (iterator) +// collectAll (iterator) template Future< @@ -560,184 +546,133 @@ collectAll(InputIterator first, InputIterator last) { typedef typename std::iterator_traits::value_type::value_type T; - if (first >= last) { - return makeFuture(std::vector>()); - } - size_t n = std::distance(first, last); - - auto ctx = new detail::WhenAllContext(); - - ctx->results.resize(n); - - auto f_saved = ctx->p.getFuture(); - - for (size_t i = 0; first != last; ++first, ++i) { - assert(i < n); - auto& f = *first; - f.setCallback_([ctx, i, n](Try t) { - ctx->results[i] = std::move(t); - if (++ctx->count == n) { - ctx->p.setValue(std::move(ctx->results)); - delete ctx; - } - }); - } + struct CollectAllContext { + CollectAllContext(int n) : results(n) {} + ~CollectAllContext() { + p.setValue(std::move(results)); + } + Promise>> p; + std::vector> results; + }; - return f_saved; + auto ctx = std::make_shared(std::distance(first, last)); + mapSetCallback(first, last, [ctx](size_t i, Try&& t) { + ctx->results[i] = std::move(t); + }); + return ctx->p.getFuture(); } -namespace detail { - -template struct CollectContextHelper; +// collect (iterator) -template -struct CollectContextHelper::value>::type> { - static inline std::vector&& getResults(std::vector& results) { - return std::move(results); - } -}; - -template -struct CollectContextHelper::value>::type> { - static inline std::vector getResults(std::vector& results) { - std::vector finalResults; - finalResults.reserve(results.size()); - for (auto& opt : results) { - finalResults.push_back(std::move(opt.value())); - } - return finalResults; - } -}; +namespace detail { template struct CollectContext { - - typedef typename std::conditional< - std::is_default_constructible::value, - T, - Optional - >::type VecT; - - explicit CollectContext(int n) : count(0), threw(false) { - results.resize(n); - } - - Promise> p; - std::vector results; - std::atomic count; - std::atomic_bool threw; - - typedef std::vector result_type; - - static inline Future> makeEmptyFuture() { - return makeFuture(std::vector()); - } - - inline void setValue() { - p.setValue(CollectContextHelper::getResults(results)); + struct Nothing { explicit Nothing(int n) {} }; + + using Result = typename std::conditional< + std::is_void::value, + void, + std::vector>::type; + + using InternalResult = typename std::conditional< + std::is_void::value, + Nothing, + std::vector>>::type; + + explicit CollectContext(int n) : result(n) {} + ~CollectContext() { + if (!threw.exchange(true)) { + // map Optional -> T + std::vector finalResult; + finalResult.reserve(result.size()); + std::transform(result.begin(), result.end(), + std::back_inserter(finalResult), + [](Optional& o) { return std::move(o.value()); }); + p.setValue(std::move(finalResult)); + } } - - inline void addResult(int i, Try& t) { - results[i] = std::move(t.value()); + inline void setPartialResult(size_t i, Try& t) { + result[i] = std::move(t.value()); } + Promise p; + InternalResult result; + std::atomic threw; }; -template <> -struct CollectContext { - - explicit CollectContext(int n) : count(0), threw(false) {} - - Promise p; - std::atomic count; - std::atomic_bool threw; - - typedef void result_type; - - static inline Future makeEmptyFuture() { - return makeFuture(); - } +// Specialize for void (implementations in Future.cpp) - inline void setValue() { - p.setValue(); - } +template <> +CollectContext::~CollectContext(); - inline void addResult(int i, Try& t) { - // do nothing - } -}; +template <> +void CollectContext::setPartialResult(size_t i, Try& t); -} // detail +} template Future::value_type::value_type ->::result_type> + typename std::iterator_traits::value_type::value_type>::Result> collect(InputIterator first, InputIterator last) { typedef typename std::iterator_traits::value_type::value_type T; - if (first >= last) { - return detail::CollectContext::makeEmptyFuture(); - } - - size_t n = std::distance(first, last); - auto ctx = new detail::CollectContext(n); - auto f_saved = ctx->p.getFuture(); - - for (size_t i = 0; first != last; ++first, ++i) { - assert(i < n); - auto& f = *first; - f.setCallback_([ctx, i, n](Try t) { - auto c = ++ctx->count; - - if (t.hasException()) { - if (!ctx->threw.exchange(true)) { - ctx->p.setException(std::move(t.exception())); - } - } else if (!ctx->threw) { - ctx->addResult(i, t); - if (c == n) { - ctx->setValue(); - } + auto ctx = std::make_shared>( + std::distance(first, last)); + mapSetCallback(first, last, [ctx](size_t i, Try&& t) { + if (t.hasException()) { + if (!ctx->threw.exchange(true)) { + ctx->p.setException(std::move(t.exception())); } + } else if (!ctx->threw) { + ctx->setPartialResult(i, t); + } + }); + return ctx->p.getFuture(); +} - if (c == n) { - delete ctx; - } - }); - } +// collect (variadic) - return f_saved; +template +typename detail::CollectVariadicContext< + typename std::decay::type::value_type...>::type +collect(Fs&&... fs) { + auto ctx = std::make_shared::type::value_type...>>(); + detail::collectVariadicHelper( + ctx, std::forward::type>(fs)...); + return ctx->p.getFuture(); } +// collectAny (iterator) + template Future< std::pair::value_type::value_type> > > + std::iterator_traits::value_type::value_type>>> collectAny(InputIterator first, InputIterator last) { typedef typename std::iterator_traits::value_type::value_type T; - auto ctx = new detail::WhenAnyContext(std::distance(first, last)); - auto f_saved = ctx->p.getFuture(); - - for (size_t i = 0; first != last; first++, i++) { - auto& f = *first; - f.setCallback_([i, ctx](Try&& t) { - if (!ctx->done.exchange(true)) { - ctx->p.setValue(std::make_pair(i, std::move(t))); - } - ctx->decref(); - }); - } + struct CollectAnyContext { + CollectAnyContext(size_t n) : done(false) {}; + Promise>> p; + std::atomic done; + }; - return f_saved; + auto ctx = std::make_shared(std::distance(first, last)); + mapSetCallback(first, last, [ctx](size_t i, Try&& t) { + if (!ctx->done.exchange(true)) { + ctx->p.setValue(std::make_pair(i, std::move(t))); + } + }); + return ctx->p.getFuture(); } +// collectN (iterator) + template Future::value_type::value_type>>>> @@ -746,90 +681,184 @@ collectN(InputIterator first, InputIterator last, size_t n) { std::iterator_traits::value_type::value_type T; typedef std::vector>> V; - struct ctx_t { + struct CollectNContext { V v; - size_t completed; + std::atomic completed = {0}; Promise p; }; - auto ctx = std::make_shared(); - ctx->completed = 0; - - // for each completed Future, increase count and add to vector, until we - // have n completed futures at which point we fulfill our Promise with the - // vector - auto it = first; - size_t i = 0; - while (it != last) { - it->then([ctx, n, i](Try&& t) { - auto& v = ctx->v; + auto ctx = std::make_shared(); + + if (size_t(std::distance(first, last)) < n) { + ctx->p.setException(std::runtime_error("Not enough futures")); + } else { + // for each completed Future, increase count and add to vector, until we + // have n completed futures at which point we fulfil our Promise with the + // vector + mapSetCallback(first, last, [ctx, n](size_t i, Try&& t) { auto c = ++ctx->completed; if (c <= n) { assert(ctx->v.size() < n); - v.push_back(std::make_pair(i, std::move(t))); + ctx->v.push_back(std::make_pair(i, std::move(t))); if (c == n) { - ctx->p.setTry(Try(std::move(v))); + ctx->p.setTry(Try(std::move(ctx->v))); } } }); - - it++; - i++; - } - - if (i < n) { - ctx->p.setException(std::runtime_error("Not enough futures")); } return ctx->p.getFuture(); } -template -typename std::enable_if::value, Future>::type -reduce(It first, It last, T initial, F func) { +// reduce (iterator) + +template +Future reduce(It first, It last, T&& initial, F&& func) { if (first == last) { return makeFuture(std::move(initial)); } + typedef typename std::iterator_traits::value_type::value_type ItT; + typedef typename std::conditional< + detail::callableWith&&>::value, Try, ItT>::type Arg; typedef isTry IsTry; - return collectAll(first, last) - .then([initial, func](std::vector>& vals) mutable { - for (auto& val : vals) { - initial = func(std::move(initial), - // Either return a ItT&& or a Try&& depending - // on the type of the argument of func. - val.template get()); - } - return initial; + folly::MoveWrapper minitial(std::move(initial)); + auto sfunc = std::make_shared(std::move(func)); + + auto f = first->then([minitial, sfunc](Try& head) mutable { + return (*sfunc)(std::move(*minitial), + head.template get()); + }); + + for (++first; first != last; ++first) { + f = collectAll(f, *first).then([sfunc](std::tuple, Try>& t) { + return (*sfunc)(std::move(std::get<0>(t).value()), + // Either return a ItT&& or a Try&& depending + // on the type of the argument of func. + std::get<1>(t).template get()); }); + } + + return f; +} + +// window (collection) + +template +std::vector> +window(Collection input, F func, size_t n) { + struct WindowContext { + WindowContext(Collection&& i, F&& fn) + : i_(0), input_(std::move(i)), promises_(input_.size()), + func_(std::move(fn)) + {} + std::atomic i_; + Collection input_; + std::vector> promises_; + F func_; + + static inline void spawn(const std::shared_ptr& ctx) { + size_t i = ctx->i_++; + if (i < ctx->input_.size()) { + // Using setCallback_ directly since we don't need the Future + ctx->func_(std::move(ctx->input_[i])).setCallback_( + // ctx is captured by value + [ctx, i](Try&& t) { + ctx->promises_[i].setTry(std::move(t)); + // Chain another future onto this one + spawn(std::move(ctx)); + }); + } + } + }; + + auto max = std::min(n, input.size()); + + auto ctx = std::make_shared( + std::move(input), std::move(func)); + + for (size_t i = 0; i < max; ++i) { + // Start the first n Futures + WindowContext::spawn(ctx); + } + + std::vector> futures; + futures.reserve(ctx->promises_.size()); + for (auto& promise : ctx->promises_) { + futures.emplace_back(promise.getFuture()); + } + + return futures; +} + +// reduce + +template +template +Future Future::reduce(I&& initial, F&& func) { + folly::MoveWrapper minitial(std::move(initial)); + folly::MoveWrapper mfunc(std::move(func)); + return then([minitial, mfunc](T& vals) mutable { + auto ret = std::move(*minitial); + for (auto& val : vals) { + ret = (*mfunc)(std::move(ret), std::move(val)); + } + return ret; + }); } +// unorderedReduce (iterator) + template -typename std::enable_if::value, Future>::type -reduce(It first, It last, T initial, F func) { +Future unorderedReduce(It first, It last, T initial, F func) { if (first == last) { return makeFuture(std::move(initial)); } typedef isTry IsTry; - auto f = first->then([initial, func](Try& head) mutable { - return func(std::move(initial), - head.template get()); - }); + struct UnorderedReduceContext { + UnorderedReduceContext(T&& memo, F&& fn, size_t n) + : lock_(), memo_(makeFuture(std::move(memo))), + func_(std::move(fn)), numThens_(0), numFutures_(n), promise_() + {}; + folly::MicroSpinLock lock_; // protects memo_ and numThens_ + Future memo_; + F func_; + size_t numThens_; // how many Futures completed and called .then() + size_t numFutures_; // how many Futures in total + Promise promise_; + }; - for (++first; first != last; ++first) { - f = collectAll(f, *first).then([func](std::tuple, Try>& t) { - return func(std::move(std::get<0>(t).value()), - // Either return a ItT&& or a Try&& depending - // on the type of the argument of func. - std::get<1>(t).template get()); + auto ctx = std::make_shared( + std::move(initial), std::move(func), std::distance(first, last)); + + mapSetCallback(first, last, [ctx](size_t i, Try&& t) { + folly::MoveWrapper> mt(std::move(t)); + // Futures can be completed in any order, simultaneously. + // To make this non-blocking, we create a new Future chain in + // the order of completion to reduce the values. + // The spinlock just protects chaining a new Future, not actually + // executing the reduce, which should be really fast. + folly::MSLGuard lock(ctx->lock_); + ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable { + // Either return a ItT&& or a Try&& depending + // on the type of the argument of func. + return ctx->func_(std::move(v), mt->template get()); }); - } + if (++ctx->numThens_ == ctx->numFutures_) { + // After reducing the value of the last Future, fulfill the Promise + ctx->memo_.setCallback_([ctx](Try&& t2) { + ctx->promise_.setValue(std::move(t2)); + }); + } + }); - return f; + return ctx->promise_.getFuture(); } +// within + template Future Future::within(Duration dur, Timekeeper* tk) { return within(dur, TimedOut(), tk); @@ -868,9 +897,11 @@ Future Future::within(Duration dur, E e, Timekeeper* tk) { } }); - return ctx->promise.getFuture(); + return ctx->promise.getFuture().via(getExecutor()); } +// delayed + template Future Future::delayed(Duration dur, Timekeeper* tk) { return collectAll(*this, futures::sleep(dur, tk)) @@ -1008,11 +1039,27 @@ inline void Future::getVia(DrivableExecutor* e) { waitVia(e).value(); } +namespace detail { + template + struct TryEquals { + static bool equals(const Try& t1, const Try& t2) { + return t1.value() == t2.value(); + } + }; + + template <> + struct TryEquals { + static bool equals(const Try& t1, const Try& t2) { + return true; + } + }; +} + template Future Future::willEqual(Future& f) { return collectAll(*this, f).then([](const std::tuple, Try>& t) { if (std::get<0>(t).hasValue() && std::get<1>(t).hasValue()) { - return std::get<0>(t).value() == std::get<1>(t).value(); + return detail::TryEquals::equals(std::get<0>(t), std::get<1>(t)); } else { return false; } @@ -1032,30 +1079,47 @@ Future Future::filter(F predicate) { }); } -namespace futures { - namespace { - template - Future chainHelper(Future f) { - return f; - } +template +template +auto Future::thenMulti(Callback&& fn) + -> decltype(this->then(std::forward(fn))) { + // thenMulti with one callback is just a then + return then(std::forward(fn)); +} - template - Future chainHelper(F f, Fn fn, Callbacks... fns) { - return chainHelper(f.then(fn), fns...); - } - } +template +template +auto Future::thenMulti(Callback&& fn, Callbacks&&... fns) + -> decltype(this->then(std::forward(fn)). + thenMulti(std::forward(fns)...)) { + // thenMulti with two callbacks is just then(a).thenMulti(b, ...) + return then(std::forward(fn)). + thenMulti(std::forward(fns)...); +} - template - std::function(Try)> - chain(Callbacks... fns) { - MoveWrapper> pw; - MoveWrapper> fw(chainHelper(pw->getFuture(), fns...)); - return [=](Try t) mutable { - pw->setTry(std::move(t)); - return std::move(*fw); - }; - } +template +template +auto Future::thenMultiWithExecutor(Executor* x, Callback&& fn, + Callbacks&&... fns) + -> decltype(this->then(std::forward(fn)). + thenMulti(std::forward(fns)...)) { + // thenMultiExecutor with two callbacks is + // via(x).then(a).thenMulti(b, ...).via(oldX) + auto oldX = getExecutor(); + setExecutor(x); + return then(std::forward(fn)). + thenMulti(std::forward(fns)...).via(oldX); +} +template +template +auto Future::thenMultiWithExecutor(Executor* x, Callback&& fn) + -> decltype(this->then(std::forward(fn))) { + // thenMulti with one callback is just a then with an executor + return then(x, std::forward(fn)); +} + +namespace futures { template std::vector> map(It first, It last, F func) { std::vector> results; @@ -1066,6 +1130,14 @@ namespace futures { } } +// Instantiate the most common Future types to save compile time +extern template class Future; +extern template class Future; +extern template class Future; +extern template class Future; +extern template class Future; +extern template class Future; + } // namespace folly // I haven't included a Future specialization because I don't forsee us