(Wangle) variadic collect
authorHannes Roth <hannesr@fb.com>
Mon, 8 Jun 2015 20:07:01 +0000 (13:07 -0700)
committerSara Golemon <sgolemon@fb.com>
Tue, 9 Jun 2015 20:21:25 +0000 (13:21 -0700)
Summary:
For D2099047 (matthieu) and also for symmetry. Can re-use most of the
code, also refactored it a bit (using an empty base case).

Test Plan:
Run all the tests.

Will add some more before committing.

Reviewed By: jsedgwick@fb.com

Subscribers: folly-diffs@, jsedgwick, yfeldblum, chalfant, matthieu

FB internal diff: D2131515

Signature: t1:2131515:1433776852:544166fbfdfabf6760fd78f87821290e17e6e4a3

folly/futures/Future-inl.h
folly/futures/Future-pre.h
folly/futures/detail/Core.h
folly/futures/helpers.h
folly/futures/test/CollectTest.cpp
folly/futures/test/FutureTest.cpp
folly/futures/test/WindowTest.cpp

index a63b2a801fb9c09a7f7566eafc9feb2874f8f96b..4b2c7477675cb22c94c4ef3a530b84f603dbf28b 100644 (file)
@@ -545,13 +545,13 @@ void mapSetCallback(InputIterator first, InputIterator last, F func) {
 // collectAll (variadic)
 
 template <typename... Fs>
-typename detail::VariadicContext<
+typename detail::CollectAllVariadicContext<
   typename std::decay<Fs>::type::value_type...>::type
 collectAll(Fs&&... fs) {
-  auto ctx = std::make_shared<detail::VariadicContext<
+  auto ctx = std::make_shared<detail::CollectAllVariadicContext<
     typename std::decay<Fs>::type::value_type...>>();
-  detail::collectAllVariadicHelper(ctx,
-    std::forward<typename std::decay<Fs>::type>(fs)...);
+  detail::collectVariadicHelper<detail::CollectAllVariadicContext>(
+    ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
   return ctx->p.getFuture();
 }
 
@@ -581,6 +581,8 @@ collectAll(InputIterator first, InputIterator last) {
   return ctx->p.getFuture();
 }
 
+// collect (iterator)
+
 namespace detail {
 
 template <typename T>
@@ -648,6 +650,21 @@ collect(InputIterator first, InputIterator last) {
   return ctx->p.getFuture();
 }
 
+// collect (variadic)
+
+template <typename... Fs>
+typename detail::CollectVariadicContext<
+  typename std::decay<Fs>::type::value_type...>::type
+collect(Fs&&... fs) {
+  auto ctx = std::make_shared<detail::CollectVariadicContext<
+    typename std::decay<Fs>::type::value_type...>>();
+  detail::collectVariadicHelper<detail::CollectVariadicContext>(
+    ctx, std::forward<typename std::decay<Fs>::type>(fs)...);
+  return ctx->p.getFuture();
+}
+
+// collectAny (iterator)
+
 template <class InputIterator>
 Future<
   std::pair<size_t,
@@ -673,6 +690,8 @@ collectAny(InputIterator first, InputIterator last) {
   return ctx->p.getFuture();
 }
 
+// collectN (iterator)
+
 template <class InputIterator>
 Future<std::vector<std::pair<size_t, Try<typename
   std::iterator_traits<InputIterator>::value_type::value_type>>>>
@@ -709,6 +728,8 @@ collectN(InputIterator first, InputIterator last, size_t n) {
   return ctx->p.getFuture();
 }
 
+// reduce (iterator)
+
 template <class It, class T, class F>
 Future<T> reduce(It first, It last, T&& initial, F&& func) {
   if (first == last) {
@@ -740,6 +761,8 @@ Future<T> reduce(It first, It last, T&& initial, F&& func) {
   return f;
 }
 
+// window (collection)
+
 template <class Collection, class F, class ItT, class Result>
 std::vector<Future<Result>>
 window(Collection input, F func, size_t n) {
@@ -787,6 +810,8 @@ window(Collection input, F func, size_t n) {
   return futures;
 }
 
+// reduce
+
 template <class T>
 template <class I, class F>
 Future<I> Future<T>::reduce(I&& initial, F&& func) {
@@ -801,6 +826,8 @@ Future<I> Future<T>::reduce(I&& initial, F&& func) {
   });
 }
 
+// unorderedReduce (iterator)
+
 template <class It, class T, class F, class ItT, class Arg>
 Future<T> unorderedReduce(It first, It last, T initial, F func) {
   if (first == last) {
@@ -849,6 +876,8 @@ Future<T> unorderedReduce(It first, It last, T initial, F func) {
   return ctx->promise_.getFuture();
 }
 
+// within
+
 template <class T>
 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
   return within(dur, TimedOut(), tk);
@@ -890,6 +919,8 @@ Future<T> Future<T>::within(Duration dur, E e, Timekeeper* tk) {
   return ctx->promise.getFuture().via(getExecutor());
 }
 
+// delayed
+
 template <class T>
 Future<T> Future<T>::delayed(Duration dur, Timekeeper* tk) {
   return collectAll(*this, futures::sleep(dur, tk))
index b7ee7b5c3f4538318897659bc4b25ac1018e706a..f76c24abfd0a976a579d672e388239e3cfd3c113 100644 (file)
@@ -41,7 +41,8 @@ struct isTry<Try<T>> : std::true_type {};
 namespace detail {
 
 template <class> class Core;
-template <class...> struct VariadicContext;
+template <class...> struct CollectAllVariadicContext;
+template <class...> struct CollectVariadicContext;
 template <class> struct CollectContext;
 
 template<typename F, typename... Args>
index 34d0a725d9479a5ae3fdc800e585fbd4fb58eafa..de0173e78b8fac3061cadffd9ff1ab8a8a5bd911 100644 (file)
@@ -341,34 +341,59 @@ class Core {
 };
 
 template <typename... Ts>
-struct VariadicContext {
-  VariadicContext() {}
-  ~VariadicContext() {
+struct CollectAllVariadicContext {
+  CollectAllVariadicContext() {}
+  template <typename T, size_t I>
+  inline void setPartialResult(Try<T>& t) {
+    std::get<I>(results) = std::move(t);
+  }
+  ~CollectAllVariadicContext() {
     p.setValue(std::move(results));
   }
-  Promise<std::tuple<Try<Ts>... >> p;
-  std::tuple<Try<Ts>... > results;
+  Promise<std::tuple<Try<Ts>...>> p;
+  std::tuple<Try<Ts>...> results;
   typedef Future<std::tuple<Try<Ts>...>> type;
 };
 
-template <typename... Ts, typename THead, typename... Fs>
-typename std::enable_if<sizeof...(Fs) == 0, void>::type
-collectAllVariadicHelper(std::shared_ptr<VariadicContext<Ts...>> ctx,
-                         THead&& head, Fs&&... tail) {
-  head.setCallback_([ctx](Try<typename THead::value_type>&& t) {
-    std::get<sizeof...(Ts) - sizeof...(Fs) - 1>(ctx->results) = std::move(t);
-  });
+template <typename... Ts>
+struct CollectVariadicContext {
+  CollectVariadicContext() {}
+  template <typename T, size_t I>
+  inline void setPartialResult(Try<T>& t) {
+    if (t.hasException()) {
+       if (!threw.exchange(true)) {
+         p.setException(std::move(t.exception()));
+       }
+     } else if (!threw) {
+       std::get<I>(results) = std::move(t.value());
+     }
+  }
+  ~CollectVariadicContext() {
+    if (!threw.exchange(true)) {
+      p.setValue(std::move(results));
+    }
+  }
+  Promise<std::tuple<Ts...>> p;
+  std::tuple<Ts...> results;
+  std::atomic<bool> threw;
+  typedef Future<std::tuple<Ts...>> type;
+};
+
+template <template <typename ...> class T, typename... Ts>
+void collectVariadicHelper(const std::shared_ptr<T<Ts...>>& ctx) {
+  // base case
 }
 
-template <typename... Ts, typename THead, typename... Fs>
-typename std::enable_if<sizeof...(Fs) != 0, void>::type
-collectAllVariadicHelper(std::shared_ptr<VariadicContext<Ts...>> ctx,
-                         THead&& head, Fs&&... tail) {
+template <template <typename ...> class T, typename... Ts,
+          typename THead, typename... TTail>
+void collectVariadicHelper(const std::shared_ptr<T<Ts...>>& ctx,
+                           THead&& head, TTail&&... tail) {
   head.setCallback_([ctx](Try<typename THead::value_type>&& t) {
-    std::get<sizeof...(Ts) - sizeof...(Fs) - 1>(ctx->results) = std::move(t);
+    ctx->template setPartialResult<typename THead::value_type,
+                                   sizeof...(Ts) - sizeof...(TTail) - 1>(t);
   });
   // template tail-recursion
-  collectAllVariadicHelper(ctx, std::forward<Fs>(tail)...);
+  collectVariadicHelper(ctx, std::forward<TTail>(tail)...);
 }
 
 }} // folly::detail
index 010a5a78cd3012ebedc6528fe521bf9098300c48..5ae28f1f4f41a092c34d981591a4fe2ef79f80be 100644 (file)
@@ -155,7 +155,7 @@ auto collectAll(Collection&& c) -> decltype(collectAll(c.begin(), c.end())) {
 /// is a Future<std::tuple<Try<T1>, Try<T2>, ...>>.
 /// The Futures are moved in, so your copies are invalid.
 template <typename... Fs>
-typename detail::VariadicContext<
+typename detail::CollectAllVariadicContext<
   typename std::decay<Fs>::type::value_type...>::type
 collectAll(Fs&&... fs);
 
@@ -174,6 +174,14 @@ auto collect(Collection&& c) -> decltype(collect(c.begin(), c.end())) {
   return collect(c.begin(), c.end());
 }
 
+/// Like collectAll, but will short circuit on the first exception. Thus, the
+/// type of the returned Future is std::tuple<T1, T2, ...> instead of
+/// std::tuple<Try<T1>, Try<T2>, ...>
+template <typename... Fs>
+typename detail::CollectVariadicContext<
+  typename std::decay<Fs>::type::value_type...>::type
+collect(Fs&&... fs);
+
 /** The result is a pair of the index of the first Future to complete and
   the Try. If multiple Futures complete at the same time (or are already
   complete when passed in), the "winner" is chosen non-deterministically.
index ea4b3bf8f561edbcc42af5a033af287e73dcb487..f9746d18b3b5eb2a1ca8d079112206b26c561ad9 100644 (file)
@@ -16,6 +16,8 @@
 
 #include <gtest/gtest.h>
 
+#include <boost/thread/barrier.hpp>
+
 #include <folly/futures/Future.h>
 #include <folly/Random.h>
 #include <folly/small_vector.h>
@@ -353,6 +355,134 @@ TEST(Collect, alreadyCompleted) {
   }
 }
 
+TEST(Collect, parallel) {
+  std::vector<Promise<int>> ps(10);
+  std::vector<Future<int>> fs;
+  for (size_t i = 0; i < ps.size(); i++) {
+    fs.emplace_back(ps[i].getFuture());
+  }
+  auto f = collect(fs);
+
+  std::vector<std::thread> ts;
+  boost::barrier barrier(ps.size() + 1);
+  for (size_t i = 0; i < ps.size(); i++) {
+    ts.emplace_back([&ps, &barrier, i]() {
+      barrier.wait();
+      ps[i].setValue(i);
+    });
+  }
+
+  barrier.wait();
+
+  for (size_t i = 0; i < ps.size(); i++) {
+    ts[i].join();
+  }
+
+  EXPECT_TRUE(f.isReady());
+  for (size_t i = 0; i < ps.size(); i++) {
+    EXPECT_EQ(i, f.value()[i]);
+  }
+}
+
+TEST(Collect, parallelWithError) {
+  std::vector<Promise<int>> ps(10);
+  std::vector<Future<int>> fs;
+  for (size_t i = 0; i < ps.size(); i++) {
+    fs.emplace_back(ps[i].getFuture());
+  }
+  auto f = collect(fs);
+
+  std::vector<std::thread> ts;
+  boost::barrier barrier(ps.size() + 1);
+  for (size_t i = 0; i < ps.size(); i++) {
+    ts.emplace_back([&ps, &barrier, i]() {
+      barrier.wait();
+      if (i == (ps.size()/2)) {
+        ps[i].setException(eggs);
+      } else {
+        ps[i].setValue(i);
+      }
+    });
+  }
+
+  barrier.wait();
+
+  for (size_t i = 0; i < ps.size(); i++) {
+    ts[i].join();
+  }
+
+  EXPECT_TRUE(f.isReady());
+  EXPECT_THROW(f.value(), eggs_t);
+}
+
+TEST(Collect, allParallel) {
+  std::vector<Promise<int>> ps(10);
+  std::vector<Future<int>> fs;
+  for (size_t i = 0; i < ps.size(); i++) {
+    fs.emplace_back(ps[i].getFuture());
+  }
+  auto f = collectAll(fs);
+
+  std::vector<std::thread> ts;
+  boost::barrier barrier(ps.size() + 1);
+  for (size_t i = 0; i < ps.size(); i++) {
+    ts.emplace_back([&ps, &barrier, i]() {
+      barrier.wait();
+      ps[i].setValue(i);
+    });
+  }
+
+  barrier.wait();
+
+  for (size_t i = 0; i < ps.size(); i++) {
+    ts[i].join();
+  }
+
+  EXPECT_TRUE(f.isReady());
+  for (size_t i = 0; i < ps.size(); i++) {
+    EXPECT_TRUE(f.value()[i].hasValue());
+    EXPECT_EQ(i, f.value()[i].value());
+  }
+}
+
+TEST(Collect, allParallelWithError) {
+  std::vector<Promise<int>> ps(10);
+  std::vector<Future<int>> fs;
+  for (size_t i = 0; i < ps.size(); i++) {
+    fs.emplace_back(ps[i].getFuture());
+  }
+  auto f = collectAll(fs);
+
+  std::vector<std::thread> ts;
+  boost::barrier barrier(ps.size() + 1);
+  for (size_t i = 0; i < ps.size(); i++) {
+    ts.emplace_back([&ps, &barrier, i]() {
+      barrier.wait();
+      if (i == (ps.size()/2)) {
+        ps[i].setException(eggs);
+      } else {
+        ps[i].setValue(i);
+      }
+    });
+  }
+
+  barrier.wait();
+
+  for (size_t i = 0; i < ps.size(); i++) {
+    ts[i].join();
+  }
+
+  EXPECT_TRUE(f.isReady());
+  for (size_t i = 0; i < ps.size(); i++) {
+    if (i == (ps.size()/2)) {
+      EXPECT_THROW(f.value()[i].value(), eggs_t);
+    } else {
+      EXPECT_TRUE(f.value()[i].hasValue());
+      EXPECT_EQ(i, f.value()[i].value());
+    }
+  }
+}
+
 TEST(Collect, collectN) {
   std::vector<Promise<void>> promises(10);
   std::vector<Future<void>> futures;
@@ -443,6 +573,59 @@ TEST(Collect, collectAllVariadicReferences) {
   EXPECT_TRUE(flag);
 }
 
+TEST(Collect, collectAllVariadicWithException) {
+  Promise<bool> pb;
+  Promise<int> pi;
+  Future<bool> fb = pb.getFuture();
+  Future<int> fi = pi.getFuture();
+  bool flag = false;
+  collectAll(std::move(fb), std::move(fi))
+    .then([&](std::tuple<Try<bool>, Try<int>> tup) {
+      flag = true;
+      EXPECT_TRUE(std::get<0>(tup).hasValue());
+      EXPECT_EQ(std::get<0>(tup).value(), true);
+      EXPECT_TRUE(std::get<1>(tup).hasException());
+      EXPECT_THROW(std::get<1>(tup).value(), eggs_t);
+    });
+  pb.setValue(true);
+  EXPECT_FALSE(flag);
+  pi.setException(eggs);
+  EXPECT_TRUE(flag);
+}
+
+TEST(Collect, collectVariadic) {
+  Promise<bool> pb;
+  Promise<int> pi;
+  Future<bool> fb = pb.getFuture();
+  Future<int> fi = pi.getFuture();
+  bool flag = false;
+  collect(std::move(fb), std::move(fi))
+    .then([&](std::tuple<bool, int> tup) {
+      flag = true;
+      EXPECT_EQ(std::get<0>(tup), true);
+      EXPECT_EQ(std::get<1>(tup), 42);
+    });
+  pb.setValue(true);
+  EXPECT_FALSE(flag);
+  pi.setValue(42);
+  EXPECT_TRUE(flag);
+}
+
+TEST(Collect, collectVariadicWithException) {
+  Promise<bool> pb;
+  Promise<int> pi;
+  Future<bool> fb = pb.getFuture();
+  Future<int> fi = pi.getFuture();
+  bool flag = false;
+  auto f = collect(std::move(fb), std::move(fi));
+  pb.setValue(true);
+  EXPECT_FALSE(f.isReady());
+  pi.setException(eggs);
+  EXPECT_TRUE(f.isReady());
+  EXPECT_TRUE(f.getTry().hasException());
+  EXPECT_THROW(f.get(), eggs_t);
+}
+
 TEST(Collect, collectAllNone) {
   std::vector<Future<int>> fs;
   auto f = collectAll(fs);
index 5d171bb9732f50f9c0e12476072b3a79d2e9a253..ec5cb5a188a3581df2b57451810fb860e69d543b 100644 (file)
@@ -413,6 +413,31 @@ TEST(Future, thenFunctionFuture) {
   EXPECT_EQ(f.value(), "start;static;class-static;class");
 }
 
+TEST(Future, thenStdFunction) {
+  {
+    std::function<int()> fn = [](){ return 42; };
+    auto f = makeFuture().then(std::move(fn));
+    EXPECT_EQ(f.value(), 42);
+  }
+  {
+    std::function<int(int)> fn = [](int i){ return i + 23; };
+    auto f = makeFuture(19).then(std::move(fn));
+    EXPECT_EQ(f.value(), 42);
+  }
+  {
+    std::function<int(Try<int>&)> fn = [](Try<int>& t){ return t.value() + 2; };
+    auto f = makeFuture(1).then(std::move(fn));
+    EXPECT_EQ(f.value(), 3);
+  }
+  {
+    bool flag = false;
+    std::function<void()> fn = [&flag](){ flag = true; };
+    auto f = makeFuture().then(std::move(fn));
+    EXPECT_TRUE(f.isReady());
+    EXPECT_TRUE(flag);
+  }
+}
+
 TEST(Future, thenBind) {
   auto l = []() {
     return makeFuture("bind");
index cc480fbe0f994b94b941b4c51ab9b168af08ffc9..9dbdb15682934e5e6a2ba3f9d2ae1c5e9b4e191f 100644 (file)
 
 #include <gtest/gtest.h>
 
+#include <boost/thread/barrier.hpp>
+
 #include <folly/futures/Future.h>
 
 #include <vector>
 
 using namespace folly;
 
+typedef FutureException eggs_t;
+static eggs_t eggs("eggs");
+
 TEST(Window, basic) {
   // int -> Future<int>
   auto fn = [](std::vector<int> input, size_t window_size, size_t expect) {
@@ -79,3 +84,107 @@ TEST(Window, basic) {
     EXPECT_EQ(6, res);
   }
 }
+
+TEST(Window, parallel) {
+  std::vector<int> input;
+  std::vector<Promise<int>> ps(10);
+  for (size_t i = 0; i < ps.size(); i++) {
+    input.emplace_back(i);
+  }
+  auto f = collect(window(input, [&](int i) {
+    return ps[i].getFuture();
+  }, 3));
+
+  std::vector<std::thread> ts;
+  boost::barrier barrier(ps.size() + 1);
+  for (size_t i = 0; i < ps.size(); i++) {
+    ts.emplace_back([&ps, &barrier, i]() {
+      barrier.wait();
+      ps[i].setValue(i);
+    });
+  }
+
+  barrier.wait();
+
+  for (size_t i = 0; i < ps.size(); i++) {
+    ts[i].join();
+  }
+
+  EXPECT_TRUE(f.isReady());
+  for (size_t i = 0; i < ps.size(); i++) {
+    EXPECT_EQ(i, f.value()[i]);
+  }
+}
+
+TEST(Window, parallelWithError) {
+  std::vector<int> input;
+  std::vector<Promise<int>> ps(10);
+  for (size_t i = 0; i < ps.size(); i++) {
+    input.emplace_back(i);
+  }
+  auto f = collect(window(input, [&](int i) {
+    return ps[i].getFuture();
+  }, 3));
+
+  std::vector<std::thread> ts;
+  boost::barrier barrier(ps.size() + 1);
+  for (size_t i = 0; i < ps.size(); i++) {
+    ts.emplace_back([&ps, &barrier, i]() {
+      barrier.wait();
+      if (i == (ps.size()/2)) {
+        ps[i].setException(eggs);
+      } else {
+        ps[i].setValue(i);
+      }
+    });
+  }
+
+  barrier.wait();
+
+  for (size_t i = 0; i < ps.size(); i++) {
+    ts[i].join();
+  }
+
+  EXPECT_TRUE(f.isReady());
+  EXPECT_THROW(f.value(), eggs_t);
+}
+
+TEST(Window, allParallelWithError) {
+  std::vector<int> input;
+  std::vector<Promise<int>> ps(10);
+  for (size_t i = 0; i < ps.size(); i++) {
+    input.emplace_back(i);
+  }
+  auto f = collectAll(window(input, [&](int i) {
+    return ps[i].getFuture();
+  }, 3));
+
+  std::vector<std::thread> ts;
+  boost::barrier barrier(ps.size() + 1);
+  for (size_t i = 0; i < ps.size(); i++) {
+    ts.emplace_back([&ps, &barrier, i]() {
+      barrier.wait();
+      if (i == (ps.size()/2)) {
+        ps[i].setException(eggs);
+      } else {
+        ps[i].setValue(i);
+      }
+    });
+  }
+
+  barrier.wait();
+
+  for (size_t i = 0; i < ps.size(); i++) {
+    ts[i].join();
+  }
+
+  EXPECT_TRUE(f.isReady());
+  for (size_t i = 0; i < ps.size(); i++) {
+    if (i == (ps.size()/2)) {
+      EXPECT_THROW(f.value()[i].value(), eggs_t);
+    } else {
+      EXPECT_TRUE(f.value()[i].hasValue());
+      EXPECT_EQ(i, f.value()[i].value());
+    }
+  }
+}