(Wangle) unorderedReduce
authorHannes Roth <hannesr@fb.com>
Thu, 14 May 2015 22:36:27 +0000 (15:36 -0700)
committerViswanath Sivakumar <viswanath@fb.com>
Wed, 20 May 2015 17:57:10 +0000 (10:57 -0700)
Summary:
Use this if you don't need the order of the input, e.g. summing up
values. This constructs a separate Future chain to do the reducing,
because we don't want to add locking while reducing. The only lock
necessary is when adding a new Future to the chain, which should be
really quick.

Test Plan: Run all the tests.

Reviewed By: hans@fb.com

Subscribers: folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D2015326

Tasks: 6025252

Signature: t1:2015326:1431557191:9ea2edccb0162dedf067b5b3300de2fe72a1a4c9

folly/futures/Future-inl.h
folly/futures/helpers.h
folly/futures/test/FutureTest.cpp

index aecfa5784d694c2e0d8353ede95776e6f4a40001..980725247f2e09438ea3a0c9e4aae8886e826672 100644 (file)
@@ -800,6 +800,54 @@ Future<I> Future<T>::reduce(I&& initial, F&& func) {
   });
 }
 
+template <class It, class T, class F, class ItT, class Arg>
+Future<T> unorderedReduce(It first, It last, T initial, F func) {
+  if (first == last) {
+    return makeFuture(std::move(initial));
+  }
+
+  typedef isTry<Arg> IsTry;
+
+  struct UnorderedReduceContext {
+    UnorderedReduceContext(T&& memo, F&& fn, size_t n)
+        : lock_(), memo_(makeFuture<T>(std::move(memo))),
+          func_(std::move(fn)), numThens_(0), numFutures_(n), promise_()
+      {};
+    folly::MicroSpinLock lock_; // protects memo_ and numThens_
+    Future<T> memo_;
+    F func_;
+    size_t numThens_; // how many Futures completed and called .then()
+    size_t numFutures_; // how many Futures in total
+    Promise<T> promise_;
+  };
+
+  auto ctx = std::make_shared<UnorderedReduceContext>(
+    std::move(initial), std::move(func), std::distance(first, last));
+
+  mapSetCallback<ItT>(first, last, [ctx](size_t i, Try<ItT>&& t) {
+    folly::MoveWrapper<Try<ItT>> mt(std::move(t));
+    // Futures can be completed in any order, simultaneously.
+    // To make this non-blocking, we create a new Future chain in
+    // the order of completion to reduce the values.
+    // The spinlock just protects chaining a new Future, not actually
+    // executing the reduce, which should be really fast.
+    folly::MSLGuard lock(ctx->lock_);
+    ctx->memo_ = ctx->memo_.then([ctx, mt](T&& v) mutable {
+      // Either return a ItT&& or a Try<ItT>&& depending
+      // on the type of the argument of func.
+      return ctx->func_(std::move(v), mt->template get<IsTry::value, Arg&&>());
+    });
+    if (++ctx->numThens_ == ctx->numFutures_) {
+      // After reducing the value of the last Future, fulfill the Promise
+      ctx->memo_.setCallback_([ctx](Try<T>&& t2) {
+        ctx->promise_.setValue(std::move(t2));
+      });
+    }
+  });
+
+  return ctx->promise_.getFuture();
+}
+
 template <class T>
 Future<T> Future<T>::within(Duration dur, Timekeeper* tk) {
   return within(dur, TimedOut(), tk);
index 775989e8a5b9295e9f0edd072eeaef0cd0650a31..010a5a78cd3012ebedc6528fe521bf9098300c48 100644 (file)
@@ -239,6 +239,9 @@ using isFutureResult = isFuture<typename std::result_of<F(T&&, Arg&&)>::type>;
     The type of the final result is a Future of the type of the initial value.
 
     Func can either return a T, or a Future<T>
+
+    func is called in order of the input, see unorderedReduce if that is not
+    a requirement
   */
 template <class It, class T, class F>
 Future<T> reduce(It first, It last, T&& initial, F&& func);
@@ -255,4 +258,24 @@ auto reduce(Collection&& c, T&& initial, F&& func)
       std::forward<F>(func));
 }
 
+/** like reduce, but calls func on finished futures as they complete
+    does NOT keep the order of the input
+  */
+template <class It, class T, class F,
+          class ItT = typename std::iterator_traits<It>::value_type::value_type,
+          class Arg = MaybeTryArg<F, T, ItT>>
+Future<T> unorderedReduce(It first, It last, T initial, F func);
+
+/// Sugar for the most common case
+template <class Collection, class T, class F>
+auto unorderedReduce(Collection&& c, T&& initial, F&& func)
+    -> decltype(unorderedReduce(c.begin(), c.end(), std::forward<T>(initial),
+                std::forward<F>(func))) {
+  return unorderedReduce(
+      c.begin(),
+      c.end(),
+      std::forward<T>(initial),
+      std::forward<F>(func));
+}
+
 } // namespace folly
index 96b5cd5fbd9237c8ed5567bd0e49aafe3d4994ac..bff61b0c0b4c1ef78eb81e24f7296cc47351f73f 100644 (file)
@@ -1807,6 +1807,60 @@ TEST(Reduce, Chain) {
   }
 }
 
+TEST(Reduce, Streaming) {
+  {
+    std::vector<Future<int>> fs;
+    fs.push_back(makeFuture(1));
+    fs.push_back(makeFuture(2));
+    fs.push_back(makeFuture(3));
+
+    Future<double> f = unorderedReduce(fs.begin(), fs.end(), 0.0,
+      [](double a, int&& b){
+        return double(b);
+      });
+    EXPECT_EQ(3.0, f.get());
+  }
+  {
+    Promise<int> p1;
+    Promise<int> p2;
+    Promise<int> p3;
+
+    std::vector<Future<int>> fs;
+    fs.push_back(p1.getFuture());
+    fs.push_back(p2.getFuture());
+    fs.push_back(p3.getFuture());
+
+    Future<double> f = unorderedReduce(fs.begin(), fs.end(), 0.0,
+      [](double a, int&& b){
+        return double(b);
+      });
+    p3.setValue(3);
+    p2.setValue(2);
+    p1.setValue(1);
+    EXPECT_EQ(1.0, f.get());
+  }
+}
+
+TEST(Reduce, StreamingException) {
+  Promise<int> p1;
+  Promise<int> p2;
+  Promise<int> p3;
+
+  std::vector<Future<int>> fs;
+  fs.push_back(p1.getFuture());
+  fs.push_back(p2.getFuture());
+  fs.push_back(p3.getFuture());
+
+  Future<double> f = unorderedReduce(fs.begin(), fs.end(), 0.0,
+    [](double a, int&& b){
+      return b + 0.0;
+    });
+  p3.setValue(3);
+  p2.setException(exception_wrapper(std::runtime_error("blah")));
+  p1.setValue(1);
+  EXPECT_THROW(f.get(), std::runtime_error);
+}
+
 TEST(Map, Basic) {
   Promise<int> p1;
   Promise<int> p2;