From 162c972335772b3e972fa63a7f4499ff066cd17b Mon Sep 17 00:00:00 2001 From: James Sedgwick Date: Wed, 15 Apr 2015 18:32:11 -0700 Subject: [PATCH] collect() Summary: title Test Plan: unit Reviewed By: hans@fb.com Subscribers: fbcode-common-diffs@, targeting-diff-backend@, zhuohuang, thom, folly-diffs@, jsedgwick, yfeldblum, chalfant FB internal diff: D1992144 Tasks: 6025255 Signature: t1:1992144:1429120337:7678f790dd0f383295c036e6627bdf417ae43fc7 --- folly/futures/Future-inl.h | 111 ++++++++++++++++++++++++++--- folly/futures/Future.h | 10 +++ folly/futures/test/FutureTest.cpp | 112 ++++++++++++++++++++++++++++++ 3 files changed, 223 insertions(+), 10 deletions(-) diff --git a/folly/futures/Future-inl.h b/folly/futures/Future-inl.h index 0a616a86..186afe36 100644 --- a/folly/futures/Future-inl.h +++ b/folly/futures/Future-inl.h @@ -536,8 +536,7 @@ Future via(Executor* executor) { template typename detail::VariadicContext< typename std::decay::type::value_type...>::type -whenAll(Fs&&... fs) -{ +whenAll(Fs&&... fs) { auto ctx = new detail::VariadicContext::type::value_type...>(); ctx->total = sizeof...(fs); @@ -553,8 +552,7 @@ template Future< std::vector< Try::value_type::value_type>>> -whenAll(InputIterator first, InputIterator last) -{ +whenAll(InputIterator first, InputIterator last) { typedef typename std::iterator_traits::value_type::value_type T; @@ -572,13 +570,106 @@ whenAll(InputIterator first, InputIterator last) for (size_t i = 0; first != last; ++first, ++i) { assert(i < n); auto& f = *first; - f.setCallback_([ctx, i, n](Try&& t) { - ctx->results[i] = std::move(t); - if (++ctx->count == n) { - ctx->p.setValue(std::move(ctx->results)); - delete ctx; + f.setCallback_([ctx, i, n](Try t) { + ctx->results[i] = std::move(t); + if (++ctx->count == n) { + ctx->p.setValue(std::move(ctx->results)); + delete ctx; + } + }); + } + + return f_saved; +} + +namespace detail { + +template +struct CollectContext { + explicit CollectContext(int n) : count(0), threw(false) { + results.resize(n); + } + Promise> p; + std::vector results; + std::atomic count; + std::atomic_bool threw; + + typedef std::vector result_type; + + static inline Future> makeEmptyFuture() { + return makeFuture(std::vector()); + } + + inline void setValue() { + p.setValue(std::move(results)); + } + + inline void addResult(int i, Try& t) { + results[i] = std::move(t.value()); + } +}; + +template <> +struct CollectContext { + explicit CollectContext(int n) : count(0), threw(false) {} + Promise p; + std::atomic count; + std::atomic_bool threw; + + typedef void result_type; + + static inline Future makeEmptyFuture() { + return makeFuture(); + } + + inline void setValue() { + p.setValue(); + } + + inline void addResult(int i, Try& t) { + // do nothing + } +}; + +} // detail + +template +Future::value_type::value_type +>::result_type> +collect(InputIterator first, InputIterator last) { + typedef + typename std::iterator_traits::value_type::value_type T; + + if (first >= last) { + return detail::CollectContext::makeEmptyFuture(); + } + + size_t n = std::distance(first, last); + auto ctx = new detail::CollectContext(n); + auto f_saved = ctx->p.getFuture(); + + for (size_t i = 0; first != last; ++first, ++i) { + assert(i < n); + auto& f = *first; + f.setCallback_([ctx, i, n](Try t) { + auto c = ++ctx->count; + + if (t.hasException()) { + if (!ctx->threw.exchange(true)) { + ctx->p.setException(std::move(t.exception())); } - }); + } else if (!ctx->threw) { + ctx->addResult(i, t); + if (c == n) { + ctx->setValue(); + } + } + + if (c == n) { + delete ctx; + } + }); } return f_saved; diff --git a/folly/futures/Future.h b/folly/futures/Future.h index 6919fd4e..90207dd8 100644 --- a/folly/futures/Future.h +++ b/folly/futures/Future.h @@ -56,6 +56,7 @@ namespace detail { template struct Core; template struct VariadicContext; +template struct CollectContext; template using resultOf = decltype(std::declval()(std::declval()...)); @@ -622,6 +623,15 @@ typename detail::VariadicContext< typename std::decay::type::value_type...>::type whenAll(Fs&&... fs); +/// Like whenAll, but will short circuit on the first exception. Thus, the +/// type of the returned Future is std::vector instead of +/// std::vector> +template +Future::value_type::value_type +>::result_type> +collect(InputIterator first, InputIterator last); + /** The result is a pair of the index of the first Future to complete and the Try. If multiple Futures complete at the same time (or are already complete when passed in), the "winner" is chosen non-deterministically. diff --git a/folly/futures/test/FutureTest.cpp b/folly/futures/test/FutureTest.cpp index c74458ac..82ac1f17 100644 --- a/folly/futures/test/FutureTest.cpp +++ b/folly/futures/test/FutureTest.cpp @@ -766,6 +766,118 @@ TEST(Future, whenAll) { } } +TEST(Future, collect) { + // success case + { + vector> promises(10); + vector> futures; + + for (auto& p : promises) + futures.push_back(p.getFuture()); + + auto allf = collect(futures.begin(), futures.end()); + + random_shuffle(promises.begin(), promises.end()); + for (auto& p : promises) { + EXPECT_FALSE(allf.isReady()); + p.setValue(42); + } + + EXPECT_TRUE(allf.isReady()); + for (auto i : allf.value()) { + EXPECT_EQ(42, i); + } + } + + // failure case + { + vector> promises(10); + vector> futures; + + for (auto& p : promises) + futures.push_back(p.getFuture()); + + auto allf = collect(futures.begin(), futures.end()); + + random_shuffle(promises.begin(), promises.end()); + for (int i = 0; i < 10; i++) { + if (i < 5) { + // everthing goes well so far... + EXPECT_FALSE(allf.isReady()); + promises[i].setValue(42); + } else if (i == 5) { + // short circuit with an exception + EXPECT_FALSE(allf.isReady()); + promises[i].setException(eggs); + EXPECT_TRUE(allf.isReady()); + } else if (i < 8) { + // don't blow up on further values + EXPECT_TRUE(allf.isReady()); + promises[i].setValue(42); + } else { + // don't blow up on further exceptions + EXPECT_TRUE(allf.isReady()); + promises[i].setException(eggs); + } + } + + EXPECT_THROW(allf.value(), eggs_t); + } + + // void futures success case + { + vector> promises(10); + vector> futures; + + for (auto& p : promises) + futures.push_back(p.getFuture()); + + auto allf = collect(futures.begin(), futures.end()); + + random_shuffle(promises.begin(), promises.end()); + for (auto& p : promises) { + EXPECT_FALSE(allf.isReady()); + p.setValue(); + } + + EXPECT_TRUE(allf.isReady()); + } + + // void futures failure case + { + vector> promises(10); + vector> futures; + + for (auto& p : promises) + futures.push_back(p.getFuture()); + + auto allf = collect(futures.begin(), futures.end()); + + random_shuffle(promises.begin(), promises.end()); + for (int i = 0; i < 10; i++) { + if (i < 5) { + // everthing goes well so far... + EXPECT_FALSE(allf.isReady()); + promises[i].setValue(); + } else if (i == 5) { + // short circuit with an exception + EXPECT_FALSE(allf.isReady()); + promises[i].setException(eggs); + EXPECT_TRUE(allf.isReady()); + } else if (i < 8) { + // don't blow up on further values + EXPECT_TRUE(allf.isReady()); + promises[i].setValue(); + } else { + // don't blow up on further exceptions + EXPECT_TRUE(allf.isReady()); + promises[i].setException(eggs); + } + } + + EXPECT_THROW(allf.value(), eggs_t); + } +} TEST(Future, whenAny) { { -- 2.34.1