(Wangle) window
authorHannes Roth <hannesr@fb.com>
Thu, 14 May 2015 18:50:06 +0000 (11:50 -0700)
committerViswanath Sivakumar <viswanath@fb.com>
Wed, 20 May 2015 17:57:08 +0000 (10:57 -0700)
Summary: `window` creates up to `n` Futures at a time and only starts new ones when previous ones complete. A sliding window.

Test Plan: Run all the tests.

Reviewed By: hans@fb.com

Subscribers: bmatheny, henryf, scottstraw, juliafu, folly-diffs@, jsedgwick, yfeldblum, chalfant

FB internal diff: D2015310

Signature: t1:2015310:1431557556:1017006cc9c9c2562ebe2c3dabfc4dbf316ff408

folly/futures/Future-inl.h
folly/futures/helpers.h
folly/futures/test/FutureTest.cpp

index 28cb216f8ef8abb7f528ad109106084e5cabb8dc..f17eabfafc4be5cb14f4cafbde26121f103cf3d4 100644 (file)
@@ -739,6 +739,53 @@ Future<T> reduce(It first, It last, T&& initial, F&& func) {
   return f;
 }
 
+template <class Collection, class F, class ItT, class Result>
+std::vector<Future<Result>>
+window(Collection input, F func, size_t n) {
+  struct WindowContext {
+    WindowContext(Collection&& i, F&& fn)
+        : i_(0), input_(std::move(i)), promises_(input_.size()),
+          func_(std::move(fn))
+      {}
+    std::atomic<size_t> i_;
+    Collection input_;
+    std::vector<Promise<Result>> promises_;
+    F func_;
+
+    static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
+      size_t i = ctx->i_++;
+      if (i < ctx->input_.size()) {
+        // Using setCallback_ directly since we don't need the Future
+        ctx->func_(std::move(ctx->input_[i])).setCallback_(
+          // ctx is captured by value
+          [ctx, i](Try<ItT>&& t) {
+            ctx->promises_[i].setTry(std::move(t));
+            // Chain another future onto this one
+            spawn(std::move(ctx));
+          });
+      }
+    }
+  };
+
+  auto max = std::min(n, input.size());
+
+  auto ctx = std::make_shared<WindowContext>(
+    std::move(input), std::move(func));
+
+  for (size_t i = 0; i < max; ++i) {
+    // Start the first n Futures
+    WindowContext::spawn(ctx);
+  }
+
+  std::vector<Future<Result>> futures;
+  futures.reserve(ctx->promises_.size());
+  for (auto& promise : ctx->promises_) {
+    futures.emplace_back(promise.getFuture());
+  }
+
+  return futures;
+}
+
 template <class T>
 template <class I, class F>
 Future<I> Future<T>::reduce(I&& initial, F&& func) {
index c74564111da8bbda230ef3b8a0b484cf5e445db8..ded42702cce416a2a74cfacd56236b4e5249540d 100644 (file)
@@ -224,6 +224,21 @@ auto collectN(Collection&& c, size_t n)
   return collectN(c.begin(), c.end(), n);
 }
 
+/** window creates up to n Futures using the values
+    in the collection, and then another Future for each Future
+    that completes
+
+    this is basically a sliding window of Futures of size n
+
+    func must return a Future for each value in input
+  */
+template <class Collection, class F,
+          class ItT = typename std::iterator_traits<
+            typename Collection::iterator>::value_type,
+          class Result = typename detail::resultOf<F, ItT&&>::value_type>
+std::vector<Future<Result>>
+window(Collection input, F func, size_t n);
+
 template <typename F, typename T, typename ItT>
 using MaybeTryArg = typename std::conditional<
   detail::callableWith<F, T&&, Try<ItT>&&>::value, Try<ItT>, ItT>::type;
index faeb1da2fce190bc419962741d6c1c2a8958555b..96b5cd5fbd9237c8ed5567bd0e49aafe3d4994ac 100644 (file)
@@ -690,6 +690,36 @@ TEST(Future, unwrap) {
   EXPECT_EQ(7, f.value());
 }
 
+TEST(Future, stream) {
+  auto fn = [](vector<int> input, size_t window_size, size_t expect) {
+    auto res = reduce(
+      window(
+        input,
+        [](int i) { return makeFuture(i); },
+        2),
+      0,
+      [](int sum, const Try<int>& b) {
+        return sum + *b;
+      }).get();
+    EXPECT_EQ(expect, res);
+  };
+  {
+    // streaming 2 at a time
+    vector<int> input = {1, 2, 3};
+    fn(input, 2, 6);
+  }
+  {
+    // streaming 4 at a time
+    vector<int> input = {1, 2, 3};
+    fn(input, 4, 6);
+  }
+  {
+    // empty inpt
+    vector<int> input;
+    fn(input, 1, 0);
+  }
+}
+
 TEST(Future, collectAll) {
   // returns a vector variant
   {