collect()
authorJames Sedgwick <jsedgwick@fb.com>
Thu, 16 Apr 2015 01:32:11 +0000 (18:32 -0700)
committerAlecs King <int@fb.com>
Mon, 27 Apr 2015 23:42:29 +0000 (16:42 -0700)
Summary: title

Test Plan: unit

Reviewed By: hans@fb.com

Subscribers: fbcode-common-diffs@, targeting-diff-backend@, zhuohuang, thom, folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D1992144

Tasks: 6025255

Signature: t1:1992144:1429120337:7678f790dd0f383295c036e6627bdf417ae43fc7

folly/futures/Future-inl.h
folly/futures/Future.h
folly/futures/test/FutureTest.cpp

index 0a616a86836aba5d391a5272794ebf3acae9d145..186afe365069806d13156d8a2e67c47d306d9776 100644 (file)
@@ -536,8 +536,7 @@ Future<void> via(Executor* executor) {
 template <typename... Fs>
 typename detail::VariadicContext<
   typename std::decay<Fs>::type::value_type...>::type
-whenAll(Fs&&... fs)
-{
+whenAll(Fs&&... fs) {
   auto ctx =
     new detail::VariadicContext<typename std::decay<Fs>::type::value_type...>();
   ctx->total = sizeof...(fs);
@@ -553,8 +552,7 @@ template <class InputIterator>
 Future<
   std::vector<
   Try<typename std::iterator_traits<InputIterator>::value_type::value_type>>>
-whenAll(InputIterator first, InputIterator last)
-{
+whenAll(InputIterator first, InputIterator last) {
   typedef
     typename std::iterator_traits<InputIterator>::value_type::value_type T;
 
@@ -572,13 +570,106 @@ whenAll(InputIterator first, InputIterator last)
   for (size_t i = 0; first != last; ++first, ++i) {
      assert(i < n);
      auto& f = *first;
-     f.setCallback_([ctx, i, n](Try<T>&& t) {
-         ctx->results[i] = std::move(t);
-         if (++ctx->count == n) {
-           ctx->p.setValue(std::move(ctx->results));
-           delete ctx;
+     f.setCallback_([ctx, i, n](Try<T> t) {
+       ctx->results[i] = std::move(t);
+       if (++ctx->count == n) {
+         ctx->p.setValue(std::move(ctx->results));
+         delete ctx;
+       }
+     });
+  }
+
+  return f_saved;
+}
+
+namespace detail {
+
+template <typename T>
+struct CollectContext {
+  explicit CollectContext(int n) : count(0), threw(false) {
+    results.resize(n);
+  }
+  Promise<std::vector<T>> p;
+  std::vector<T> results;
+  std::atomic<size_t> count;
+  std::atomic_bool threw;
+
+  typedef std::vector<T> result_type;
+
+  static inline Future<std::vector<T>> makeEmptyFuture() {
+    return makeFuture(std::vector<T>());
+  }
+
+  inline void setValue() {
+    p.setValue(std::move(results));
+  }
+
+  inline void addResult(int i, Try<T>& t) {
+    results[i] = std::move(t.value());
+  }
+};
+
+template <>
+struct CollectContext<void> {
+  explicit CollectContext(int n) : count(0), threw(false) {}
+  Promise<void> p;
+  std::atomic<size_t> count;
+  std::atomic_bool threw;
+
+  typedef void result_type;
+
+  static inline Future<void> makeEmptyFuture() {
+    return makeFuture();
+  }
+
+  inline void setValue() {
+    p.setValue();
+  }
+
+  inline void addResult(int i, Try<void>& t) {
+    // do nothing
+  }
+};
+
+} // detail
+
+template <class InputIterator>
+Future<typename detail::CollectContext<
+  typename std::iterator_traits<InputIterator>::value_type::value_type
+>::result_type>
+collect(InputIterator first, InputIterator last) {
+  typedef
+    typename std::iterator_traits<InputIterator>::value_type::value_type T;
+
+  if (first >= last) {
+    return detail::CollectContext<T>::makeEmptyFuture();
+  }
+
+  size_t n = std::distance(first, last);
+  auto ctx = new detail::CollectContext<T>(n);
+  auto f_saved = ctx->p.getFuture();
+
+  for (size_t i = 0; first != last; ++first, ++i) {
+     assert(i < n);
+     auto& f = *first;
+     f.setCallback_([ctx, i, n](Try<T> t) {
+       auto c = ++ctx->count;
+
+       if (t.hasException()) {
+         if (!ctx->threw.exchange(true)) {
+           ctx->p.setException(std::move(t.exception()));
          }
-       });
+       } else if (!ctx->threw) {
+         ctx->addResult(i, t);
+         if (c == n) {
+           ctx->setValue();
+         }
+       }
+
+       if (c == n) {
+         delete ctx;
+       }
+     });
   }
 
   return f_saved;
index 6919fd4e32a89e049ba802eb263829895576b489..90207dd8a1000a807b9a18a9ba99b1d17d2a2934 100644 (file)
@@ -56,6 +56,7 @@ namespace detail {
 
 template <class> struct Core;
 template <class...> struct VariadicContext;
+template <class> struct CollectContext;
 
 template<typename F, typename... Args>
 using resultOf = decltype(std::declval<F>()(std::declval<Args>()...));
@@ -622,6 +623,15 @@ typename detail::VariadicContext<
   typename std::decay<Fs>::type::value_type...>::type
 whenAll(Fs&&... fs);
 
+/// Like whenAll, but will short circuit on the first exception. Thus, the
+/// type of the returned Future is std::vector<T> instead of
+/// std::vector<Try<T>>
+template <class InputIterator>
+Future<typename detail::CollectContext<
+  typename std::iterator_traits<InputIterator>::value_type::value_type
+>::result_type>
+collect(InputIterator first, InputIterator last);
+
 /** The result is a pair of the index of the first Future to complete and
   the Try. If multiple Futures complete at the same time (or are already
   complete when passed in), the "winner" is chosen non-deterministically.
index c74458ac0cf38eeff3a894277db7605c9db898b1..82ac1f175caf1dc38350273b5471587eb8780f3a 100644 (file)
@@ -766,6 +766,118 @@ TEST(Future, whenAll) {
   }
 }
 
+TEST(Future, collect) {
+  // success case
+  {
+    vector<Promise<int>> promises(10);
+    vector<Future<int>> futures;
+
+    for (auto& p : promises)
+      futures.push_back(p.getFuture());
+
+    auto allf = collect(futures.begin(), futures.end());
+
+    random_shuffle(promises.begin(), promises.end());
+    for (auto& p : promises) {
+      EXPECT_FALSE(allf.isReady());
+      p.setValue(42);
+    }
+
+    EXPECT_TRUE(allf.isReady());
+    for (auto i : allf.value()) {
+      EXPECT_EQ(42, i);
+    }
+  }
+
+  // failure case
+  {
+    vector<Promise<int>> promises(10);
+    vector<Future<int>> futures;
+
+    for (auto& p : promises)
+      futures.push_back(p.getFuture());
+
+    auto allf = collect(futures.begin(), futures.end());
+
+    random_shuffle(promises.begin(), promises.end());
+    for (int i = 0; i < 10; i++) {
+      if (i < 5) {
+        // everthing goes well so far...
+        EXPECT_FALSE(allf.isReady());
+        promises[i].setValue(42);
+      } else if (i == 5) {
+        // short circuit with an exception
+        EXPECT_FALSE(allf.isReady());
+        promises[i].setException(eggs);
+        EXPECT_TRUE(allf.isReady());
+      } else if (i < 8) {
+        // don't blow up on further values
+        EXPECT_TRUE(allf.isReady());
+        promises[i].setValue(42);
+      } else {
+        // don't blow up on further exceptions
+        EXPECT_TRUE(allf.isReady());
+        promises[i].setException(eggs);
+      }
+    }
+
+    EXPECT_THROW(allf.value(), eggs_t);
+  }
+
+  // void futures success case
+  {
+    vector<Promise<void>> promises(10);
+    vector<Future<void>> futures;
+
+    for (auto& p : promises)
+      futures.push_back(p.getFuture());
+
+    auto allf = collect(futures.begin(), futures.end());
+
+    random_shuffle(promises.begin(), promises.end());
+    for (auto& p : promises) {
+      EXPECT_FALSE(allf.isReady());
+      p.setValue();
+    }
+
+    EXPECT_TRUE(allf.isReady());
+  }
+
+  // void futures failure case
+  {
+    vector<Promise<void>> promises(10);
+    vector<Future<void>> futures;
+
+    for (auto& p : promises)
+      futures.push_back(p.getFuture());
+
+    auto allf = collect(futures.begin(), futures.end());
+
+    random_shuffle(promises.begin(), promises.end());
+    for (int i = 0; i < 10; i++) {
+      if (i < 5) {
+        // everthing goes well so far...
+        EXPECT_FALSE(allf.isReady());
+        promises[i].setValue();
+      } else if (i == 5) {
+        // short circuit with an exception
+        EXPECT_FALSE(allf.isReady());
+        promises[i].setException(eggs);
+        EXPECT_TRUE(allf.isReady());
+      } else if (i < 8) {
+        // don't blow up on further values
+        EXPECT_TRUE(allf.isReady());
+        promises[i].setValue();
+      } else {
+        // don't blow up on further exceptions
+        EXPECT_TRUE(allf.isReady());
+        promises[i].setException(eggs);
+      }
+    }
+
+    EXPECT_THROW(allf.value(), eggs_t);
+  }
+}
 
 TEST(Future, whenAny) {
   {