Add window overload that takes an executor to prevent stack overflow
authorWalker Mills <wmills@fb.com>
Thu, 19 Oct 2017 07:57:03 +0000 (00:57 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Thu, 19 Oct 2017 08:09:37 +0000 (01:09 -0700)
Summary:
AIUI, if there is no executor available, then callbacks are executed inline. `folly::window` uses a recursive helper function (`spawn`) to handle chaining callbacks. So if `window` is used on a large enough collection of `Future`s without executors (e.g., created by `makeFuture`, or have otherwise already completed), you get a stack overflow. A minimal repro looks like:
```

int main(int argc, char** argv) {
  std::vector<int> v(100000);
  for(int i=0; i < v.size(); i++) {
    v[i] = i;
  }

  std::vector<folly::Future<folly::Unit>> f =
      folly::window(
          std::move(v),
          [](int /* unused */) { return folly::makeFuture(); },
          1);
  folly::collectAll(f).get();
}
```

This diff resolves the issue by adding an overload of `folly::window` which takes an executor as its first parameter. The executor-less `window` overload calls through to the new function using an `InlineExecutor` as the default executor.

Reviewed By: yfeldblum

Differential Revision: D6038733

fbshipit-source-id: 5dcab575592650efa2e106f12632ec06817a0009

folly/futures/Future-inl.h
folly/futures/helpers.h
folly/futures/test/WindowTest.cpp

index c6ea59a..258c9f8 100644 (file)
@@ -23,6 +23,7 @@
 
 #include <folly/Baton.h>
 #include <folly/Optional.h>
+#include <folly/futures/InlineExecutor.h>
 #include <folly/futures/Timekeeper.h>
 #include <folly/futures/detail/Core.h>
 
@@ -1067,27 +1068,38 @@ Future<T> reduce(It first, It last, T&& initial, F&& func) {
 template <class Collection, class F, class ItT, class Result>
 std::vector<Future<Result>>
 window(Collection input, F func, size_t n) {
-  struct WindowContext {
-    WindowContext(Collection&& i, F&& fn)
-        : input_(std::move(i)), promises_(input_.size()),
-          func_(std::move(fn))
-      {}
-    std::atomic<size_t> i_ {0};
-    Collection input_;
-    std::vector<Promise<Result>> promises_;
-    F func_;
+  // Use global inline executor singleton
+  auto executor = &InlineExecutor::instance();
+  return window(executor, std::move(input), std::move(func), n);
+}
 
-    static inline void spawn(const std::shared_ptr<WindowContext>& ctx) {
-      size_t i = ctx->i_++;
-      if (i < ctx->input_.size()) {
-        // Using setCallback_ directly since we don't need the Future
-        ctx->func_(std::move(ctx->input_[i])).setCallback_(
-          // ctx is captured by value
-          [ctx, i](Try<Result>&& t) {
-            ctx->promises_[i].setTry(std::move(t));
+template <class Collection, class F, class ItT, class Result>
+std::vector<Future<Result>>
+window(Executor* executor, Collection input, F func, size_t n) {
+  struct WindowContext {
+    WindowContext(Executor* executor_, Collection&& input_, F&& func_)
+        : executor(executor_),
+          input(std::move(input_)),
+          promises(input.size()),
+          func(std::move(func_)) {}
+    std::atomic<size_t> i{0};
+    Executor* executor;
+    Collection input;
+    std::vector<Promise<Result>> promises;
+    F func;
+
+    static inline void spawn(std::shared_ptr<WindowContext> ctx) {
+      size_t i = ctx->i++;
+      if (i < ctx->input.size()) {
+        auto fut = ctx->func(std::move(ctx->input[i]));
+        fut.setCallback_([ctx = std::move(ctx), i](Try<Result>&& t) mutable {
+          const auto executor_ = ctx->executor;
+          executor_->add([ctx = std::move(ctx), i, t = std::move(t)]() mutable {
+            ctx->promises[i].setTry(std::move(t));
             // Chain another future onto this one
             spawn(std::move(ctx));
           });
+        });
       }
     }
   };
@@ -1095,16 +1107,16 @@ window(Collection input, F func, size_t n) {
   auto max = std::min(n, input.size());
 
   auto ctx = std::make_shared<WindowContext>(
-    std::move(input), std::move(func));
+      executor, std::move(input), std::move(func));
 
+  // Start the first n Futures
   for (size_t i = 0; i < max; ++i) {
-    // Start the first n Futures
-    WindowContext::spawn(ctx);
+    executor->add([ctx]() mutable { WindowContext::spawn(std::move(ctx)); });
   }
 
   std::vector<Future<Result>> futures;
-  futures.reserve(ctx->promises_.size());
-  for (auto& promise : ctx->promises_) {
+  futures.reserve(ctx->promises.size());
+  for (auto& promise : ctx->promises) {
     futures.emplace_back(promise.getFuture());
   }
 
index ff132af..a5198e7 100644 (file)
@@ -328,6 +328,15 @@ template <
     class Result = typename futures::detail::resultOf<F, ItT&&>::value_type>
 std::vector<Future<Result>> window(Collection input, F func, size_t n);
 
+template <
+    class Collection,
+    class F,
+    class ItT = typename std::iterator_traits<
+        typename Collection::iterator>::value_type,
+    class Result = typename futures::detail::resultOf<F, ItT&&>::value_type>
+std::vector<Future<Result>>
+window(Executor* executor, Collection input, F func, size_t n);
+
 template <typename F, typename T, typename ItT>
 using MaybeTryArg = typename std::conditional<
     futures::detail::callableWith<F, T&&, Try<ItT>&&>::value,
index c2105f0..098fc7a 100644 (file)
@@ -18,6 +18,7 @@
 
 #include <folly/Conv.h>
 #include <folly/futures/Future.h>
+#include <folly/futures/ManualExecutor.h>
 #include <folly/portability/GTest.h>
 
 #include <vector>
@@ -42,17 +43,17 @@ TEST(Window, basic) {
     EXPECT_EQ(expect, res);
   };
   {
-    // 2 in-flight at a time
+    SCOPED_TRACE("2 in-flight at a time");
     std::vector<int> input = {1, 2, 3};
     fn(input, 2, 6);
   }
   {
-    // 4 in-flight at a time
+    SCOPED_TRACE("4 in-flight at a time");
     std::vector<int> input = {1, 2, 3};
     fn(input, 4, 6);
   }
   {
-    // empty input
+    SCOPED_TRACE("empty input");
     std::vector<int> input;
     fn(input, 1, 0);
   }
@@ -104,8 +105,8 @@ TEST(Window, parallel) {
 
   barrier.wait();
 
-  for (size_t i = 0; i < ps.size(); i++) {
-    ts[i].join();
+  for (auto& t : ts) {
+    t.join();
   }
 
   EXPECT_TRUE(f.isReady());
@@ -139,8 +140,8 @@ TEST(Window, parallelWithError) {
 
   barrier.wait();
 
-  for (size_t i = 0; i < ps.size(); i++) {
-    ts[i].join();
+  for (auto& t : ts) {
+    t.join();
   }
 
   EXPECT_TRUE(f.isReady());
@@ -172,8 +173,8 @@ TEST(Window, allParallelWithError) {
 
   barrier.wait();
 
-  for (size_t i = 0; i < ps.size(); i++) {
-    ts[i].join();
+  for (auto& t : ts) {
+    t.join();
   }
 
   EXPECT_TRUE(f.isReady());
@@ -186,3 +187,173 @@ TEST(Window, allParallelWithError) {
     }
   }
 }
+
+TEST(WindowExecutor, basic) {
+  ManualExecutor executor;
+
+  // int -> Future<int>
+  auto fn = [executor_ = &executor](
+                std::vector<int> input, size_t window_size, size_t expect) {
+    auto res = reduce(
+        window(
+            executor_, input, [](int i) { return makeFuture(i); }, window_size),
+        0,
+        [](int sum, const Try<int>& b) { return sum + *b; });
+    executor_->waitFor(res);
+    EXPECT_EQ(expect, res.get());
+  };
+  {
+    SCOPED_TRACE("2 in-flight at a time");
+    std::vector<int> input = {1, 2, 3};
+    fn(input, 2, 6);
+  }
+  {
+    SCOPED_TRACE("4 in-flight at a time");
+    std::vector<int> input = {1, 2, 3};
+    fn(input, 4, 6);
+  }
+  {
+    SCOPED_TRACE("empty input");
+    std::vector<int> input;
+    fn(input, 1, 0);
+  }
+  {
+    // int -> Future<Unit>
+    auto res = reduce(
+        window(
+            &executor,
+            std::vector<int>({1, 2, 3}),
+            [](int /* i */) { return makeFuture(); },
+            2),
+        0,
+        [](int sum, const Try<Unit>& b) {
+          EXPECT_TRUE(b.hasValue());
+          return sum + 1;
+        });
+    executor.waitFor(res);
+    EXPECT_EQ(3, res.get());
+  }
+  {
+    // string -> return Future<int>
+    auto res = reduce(
+        window(
+            &executor,
+            std::vector<std::string>{"1", "2", "3"},
+            [](std::string s) { return makeFuture<int>(folly::to<int>(s)); },
+            2),
+        0,
+        [](int sum, const Try<int>& b) { return sum + *b; });
+    executor.waitFor(res);
+    EXPECT_EQ(6, res.get());
+  }
+}
+
+TEST(WindowExecutor, parallel) {
+  ManualExecutor executor;
+
+  std::vector<int> input;
+  std::vector<Promise<int>> ps(10);
+  for (size_t i = 0; i < ps.size(); i++) {
+    input.emplace_back(i);
+  }
+  auto f = collect(
+      window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3));
+
+  std::vector<std::thread> ts;
+  boost::barrier barrier(ps.size() + 1);
+  for (size_t i = 0; i < ps.size(); i++) {
+    ts.emplace_back([&ps, &barrier, i]() {
+      barrier.wait();
+      ps[i].setValue(i);
+    });
+  }
+
+  barrier.wait();
+
+  for (auto& t : ts) {
+    t.join();
+  }
+
+  executor.waitFor(f);
+  EXPECT_TRUE(f.isReady());
+  for (size_t i = 0; i < ps.size(); i++) {
+    EXPECT_EQ(i, f.value()[i]);
+  }
+}
+
+TEST(WindowExecutor, parallelWithError) {
+  ManualExecutor executor;
+
+  std::vector<int> input;
+  std::vector<Promise<int>> ps(10);
+  for (size_t i = 0; i < ps.size(); i++) {
+    input.emplace_back(i);
+  }
+  auto f = collect(
+      window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3));
+
+  std::vector<std::thread> ts;
+  boost::barrier barrier(ps.size() + 1);
+  for (size_t i = 0; i < ps.size(); i++) {
+    ts.emplace_back([&ps, &barrier, i]() {
+      barrier.wait();
+      if (i == (ps.size() / 2)) {
+        ps[i].setException(eggs);
+      } else {
+        ps[i].setValue(i);
+      }
+    });
+  }
+
+  barrier.wait();
+
+  for (auto& t : ts) {
+    t.join();
+  }
+
+  executor.waitFor(f);
+  EXPECT_TRUE(f.isReady());
+  EXPECT_THROW(f.value(), eggs_t);
+}
+
+TEST(WindowExecutor, allParallelWithError) {
+  ManualExecutor executor;
+
+  std::vector<int> input;
+  std::vector<Promise<int>> ps(10);
+  for (size_t i = 0; i < ps.size(); i++) {
+    input.emplace_back(i);
+  }
+  auto f = collectAll(
+      window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3));
+
+  std::vector<std::thread> ts;
+  boost::barrier barrier(ps.size() + 1);
+  for (size_t i = 0; i < ps.size(); i++) {
+    ts.emplace_back([&ps, &barrier, i]() {
+      barrier.wait();
+      if (i == (ps.size() / 2)) {
+        ps[i].setException(eggs);
+      } else {
+        ps[i].setValue(i);
+      }
+    });
+  }
+
+  barrier.wait();
+
+  for (auto& t : ts) {
+    t.join();
+  }
+
+  executor.waitFor(f);
+  EXPECT_TRUE(f.isReady());
+  for (size_t i = 0; i < ps.size(); i++) {
+    if (i == (ps.size() / 2)) {
+      EXPECT_THROW(f.value()[i].value(), eggs_t);
+    } else {
+      EXPECT_TRUE(f.value()[i].hasValue());
+      EXPECT_EQ(i, f.value()[i].value());
+    }
+  }
+}