From 4a92da5b87f3c4616e31875a4b62cca015ebea36 Mon Sep 17 00:00:00 2001 From: Hannes Roth Date: Fri, 8 May 2015 16:34:32 -0700 Subject: [PATCH] (Wangle) Implement collect* using mapSetCallback and shared_ptrs Summary: I figured it would make sense to implement all the collect* functions using a shared_ptr, instead of doing our manual reference counting and all that. Fulfilling the promise in the destructor seemed like the icing on the cake. Also saves some line of code. Test Plan: Run all the tests. Reviewed By: hans@fb.com Subscribers: folly-diffs@, jsedgwick, yfeldblum, chalfant FB internal diff: D2015320 Signature: t1:2015320:1431106133:ac3001b3696fc75230afe70908ed349102b02a45 --- folly/futures/Future-inl.h | 271 ++++++++++++++---------------------- folly/futures/Future.cpp | 16 +++ folly/futures/detail/Core.h | 44 ++---- 3 files changed, 126 insertions(+), 205 deletions(-) diff --git a/folly/futures/Future-inl.h b/folly/futures/Future-inl.h index 33b758ea..62f48007 100644 --- a/folly/futures/Future-inl.h +++ b/folly/futures/Future-inl.h @@ -531,22 +531,31 @@ inline Future via(Executor* executor) { return makeFuture().via(executor); } -// when (variadic) +// mapSetCallback calls func(i, Try) when every future completes + +template +void mapSetCallback(InputIterator first, InputIterator last, F func) { + for (size_t i = 0; first != last; ++first, ++i) { + first->setCallback_([func, i](Try&& t) { + func(i, std::move(t)); + }); + } +} + +// collectAll (variadic) template typename detail::VariadicContext< typename std::decay::type::value_type...>::type collectAll(Fs&&... fs) { - auto ctx = - new detail::VariadicContext::type::value_type...>(); - ctx->total = sizeof...(fs); - auto f_saved = ctx->p.getFuture(); + auto ctx = std::make_shared::type::value_type...>>(); detail::collectAllVariadicHelper(ctx, std::forward::type>(fs)...); - return f_saved; + return ctx->p.getFuture(); } -// when (iterator) +// collectAll (iterator) template Future< @@ -556,155 +565,87 @@ collectAll(InputIterator first, InputIterator last) { typedef typename std::iterator_traits::value_type::value_type T; - if (first >= last) { - return makeFuture(std::vector>()); - } - size_t n = std::distance(first, last); - - auto ctx = new detail::WhenAllContext(); - - ctx->results.resize(n); - - auto f_saved = ctx->p.getFuture(); - - for (size_t i = 0; first != last; ++first, ++i) { - assert(i < n); - auto& f = *first; - f.setCallback_([ctx, i, n](Try t) { - ctx->results[i] = std::move(t); - if (++ctx->count == n) { - ctx->p.setValue(std::move(ctx->results)); - delete ctx; - } - }); - } + struct CollectAllContext { + CollectAllContext(int n) : results(n) {} + ~CollectAllContext() { + p.setValue(std::move(results)); + } + Promise>> p; + std::vector> results; + }; - return f_saved; + auto ctx = std::make_shared(std::distance(first, last)); + mapSetCallback(first, last, [ctx](size_t i, Try&& t) { + ctx->results[i] = std::move(t); + }); + return ctx->p.getFuture(); } namespace detail { -template struct CollectContextHelper; - -template -struct CollectContextHelper::value>::type> { - static inline std::vector&& getResults(std::vector& results) { - return std::move(results); - } -}; - -template -struct CollectContextHelper::value>::type> { - static inline std::vector getResults(std::vector& results) { - std::vector finalResults; - finalResults.reserve(results.size()); - for (auto& opt : results) { - finalResults.push_back(std::move(opt.value())); - } - return finalResults; - } -}; - template struct CollectContext { - - typedef typename std::conditional< - std::is_default_constructible::value, - T, - Optional - >::type VecT; - - explicit CollectContext(int n) : count(0), success_count(0), threw(false) { - results.resize(n); - } - - Promise> p; - std::vector results; - std::atomic count, success_count; - std::atomic_bool threw; - - typedef std::vector result_type; - - static inline Future> makeEmptyFuture() { - return makeFuture(std::vector()); - } - - inline void setValue() { - p.setValue(CollectContextHelper::getResults(results)); + struct Nothing { explicit Nothing(int n) {} }; + + using Result = typename std::conditional< + std::is_void::value, + void, + std::vector>::type; + + using InternalResult = typename std::conditional< + std::is_void::value, + Nothing, + std::vector>>::type; + + explicit CollectContext(int n) : result(n) {} + ~CollectContext() { + if (!threw.exchange(true)) { + // map Optional -> T + std::vector finalResult; + finalResult.reserve(result.size()); + std::transform(result.begin(), result.end(), + std::back_inserter(finalResult), + [](Optional& o) { return std::move(o.value()); }); + p.setValue(std::move(finalResult)); + } } - - inline void addResult(int i, Try& t) { - results[i] = std::move(t.value()); + inline void setPartialResult(size_t i, Try& t) { + result[i] = std::move(t.value()); } + Promise p; + InternalResult result; + std::atomic threw; }; -template <> -struct CollectContext { - - explicit CollectContext(int n) : count(0), success_count(0), threw(false) {} +// Specialize for void (implementations in Future.cpp) - Promise p; - std::atomic count, success_count; - std::atomic_bool threw; - - typedef void result_type; - - static inline Future makeEmptyFuture() { - return makeFuture(); - } - - inline void setValue() { - p.setValue(); - } +template <> +CollectContext::~CollectContext(); - inline void addResult(int i, Try& t) { - // do nothing - } -}; +template <> +void CollectContext::setPartialResult(size_t i, Try& t); -} // detail +} template Future::value_type::value_type ->::result_type> + typename std::iterator_traits::value_type::value_type>::Result> collect(InputIterator first, InputIterator last) { typedef typename std::iterator_traits::value_type::value_type T; - if (first >= last) { - return detail::CollectContext::makeEmptyFuture(); - } - - size_t n = std::distance(first, last); - auto ctx = new detail::CollectContext(n); - auto f_saved = ctx->p.getFuture(); - - for (size_t i = 0; first != last; ++first, ++i) { - assert(i < n); - auto& f = *first; - f.setCallback_([ctx, i, n](Try t) { - - if (t.hasException()) { - if (!ctx->threw.exchange(true)) { - ctx->p.setException(std::move(t.exception())); - } - } else if (!ctx->threw) { - ctx->addResult(i, t); - if (++ctx->success_count == n) { - ctx->setValue(); - } - } - - if (++ctx->count == n) { - delete ctx; + auto ctx = std::make_shared>( + std::distance(first, last)); + mapSetCallback(first, last, [ctx](size_t i, Try&& t) { + if (t.hasException()) { + if (!ctx->threw.exchange(true)) { + ctx->p.setException(std::move(t.exception())); } - }); - } - - return f_saved; + } else if (!ctx->threw) { + ctx->setPartialResult(i, t); + } + }); + return ctx->p.getFuture(); } template @@ -712,25 +653,24 @@ Future< std::pair::value_type::value_type> > > + std::iterator_traits::value_type::value_type>>> collectAny(InputIterator first, InputIterator last) { typedef typename std::iterator_traits::value_type::value_type T; - auto ctx = new detail::WhenAnyContext(std::distance(first, last)); - auto f_saved = ctx->p.getFuture(); - - for (size_t i = 0; first != last; first++, i++) { - auto& f = *first; - f.setCallback_([i, ctx](Try&& t) { - if (!ctx->done.exchange(true)) { - ctx->p.setValue(std::make_pair(i, std::move(t))); - } - ctx->decref(); - }); - } + struct CollectAnyContext { + CollectAnyContext(size_t n) : done(false) {}; + Promise>> p; + std::atomic done; + }; - return f_saved; + auto ctx = std::make_shared(std::distance(first, last)); + mapSetCallback(first, last, [ctx](size_t i, Try&& t) { + if (!ctx->done.exchange(true)) { + ctx->p.setValue(std::make_pair(i, std::move(t))); + } + }); + return ctx->p.getFuture(); } template @@ -741,38 +681,29 @@ collectN(InputIterator first, InputIterator last, size_t n) { std::iterator_traits::value_type::value_type T; typedef std::vector>> V; - struct ctx_t { + struct CollectNContext { V v; - size_t completed; + std::atomic completed = {0}; Promise p; }; - auto ctx = std::make_shared(); - ctx->completed = 0; - - // for each completed Future, increase count and add to vector, until we - // have n completed futures at which point we fulfill our Promise with the - // vector - auto it = first; - size_t i = 0; - while (it != last) { - it->then([ctx, n, i](Try&& t) { - auto& v = ctx->v; + auto ctx = std::make_shared(); + + if (std::distance(first, last) < n) { + ctx->p.setException(std::runtime_error("Not enough futures")); + } else { + // for each completed Future, increase count and add to vector, until we + // have n completed futures at which point we fulfil our Promise with the + // vector + mapSetCallback(first, last, [ctx, n](size_t i, Try&& t) { auto c = ++ctx->completed; if (c <= n) { assert(ctx->v.size() < n); - v.push_back(std::make_pair(i, std::move(t))); + ctx->v.push_back(std::make_pair(i, std::move(t))); if (c == n) { - ctx->p.setTry(Try(std::move(v))); + ctx->p.setTry(Try(std::move(ctx->v))); } } }); - - it++; - i++; - } - - if (i < n) { - ctx->p.setException(std::runtime_error("Not enough futures")); } return ctx->p.getFuture(); diff --git a/folly/futures/Future.cpp b/folly/futures/Future.cpp index 0f6dc3d1..78f33d26 100644 --- a/folly/futures/Future.cpp +++ b/folly/futures/Future.cpp @@ -39,3 +39,19 @@ Future sleep(Duration dur, Timekeeper* tk) { } }} + +namespace folly { namespace detail { + +template <> +CollectContext::~CollectContext() { + if (!threw.exchange(true)) { + p.setValue(); + } +} + +template <> +void CollectContext::setPartialResult(size_t i, Try& t) { + // Nothing to do for void +} + +}} diff --git a/folly/futures/detail/Core.h b/folly/futures/detail/Core.h index 7e23dd7c..65e2cb1d 100644 --- a/folly/futures/detail/Core.h +++ b/folly/futures/detail/Core.h @@ -319,59 +319,33 @@ class Core { template struct VariadicContext { - VariadicContext() : total(0), count(0) {} - Promise... > > p; + VariadicContext() {} + ~VariadicContext() { + p.setValue(std::move(results)); + } + Promise... >> p; std::tuple... > results; - size_t total; - std::atomic count; typedef Future...>> type; }; template typename std::enable_if::type -collectAllVariadicHelper(VariadicContext *ctx, THead&& head, Fs&&... tail) { +collectAllVariadicHelper(std::shared_ptr> ctx, + THead&& head, Fs&&... tail) { head.setCallback_([ctx](Try&& t) { std::get(ctx->results) = std::move(t); - if (++ctx->count == ctx->total) { - ctx->p.setValue(std::move(ctx->results)); - delete ctx; - } }); } template typename std::enable_if::type -collectAllVariadicHelper(VariadicContext *ctx, THead&& head, Fs&&... tail) { +collectAllVariadicHelper(std::shared_ptr> ctx, + THead&& head, Fs&&... tail) { head.setCallback_([ctx](Try&& t) { std::get(ctx->results) = std::move(t); - if (++ctx->count == ctx->total) { - ctx->p.setValue(std::move(ctx->results)); - delete ctx; - } }); // template tail-recursion collectAllVariadicHelper(ctx, std::forward(tail)...); } -template -struct WhenAllContext { - WhenAllContext() : count(0) {} - Promise > > p; - std::vector > results; - std::atomic count; -}; - -template -struct WhenAnyContext { - explicit WhenAnyContext(size_t n) : done(false), ref_count(n) {}; - Promise>> p; - std::atomic done; - std::atomic ref_count; - void decref() { - if (--ref_count == 0) { - delete this; - } - } -}; - }} // folly::detail -- 2.34.1