Add window overload that takes an executor to prevent stack overflow
[folly.git] / folly / futures / test / WindowTest.cpp
index c2105f0a7b2d15b53391dad57a0baef89c20606d..098fc7a8fef9a7e02eb41569b4d18b8802346d84 100644 (file)
@@ -18,6 +18,7 @@
 
 #include <folly/Conv.h>
 #include <folly/futures/Future.h>
+#include <folly/futures/ManualExecutor.h>
 #include <folly/portability/GTest.h>
 
 #include <vector>
@@ -42,17 +43,17 @@ TEST(Window, basic) {
     EXPECT_EQ(expect, res);
   };
   {
-    // 2 in-flight at a time
+    SCOPED_TRACE("2 in-flight at a time");
     std::vector<int> input = {1, 2, 3};
     fn(input, 2, 6);
   }
   {
-    // 4 in-flight at a time
+    SCOPED_TRACE("4 in-flight at a time");
     std::vector<int> input = {1, 2, 3};
     fn(input, 4, 6);
   }
   {
-    // empty input
+    SCOPED_TRACE("empty input");
     std::vector<int> input;
     fn(input, 1, 0);
   }
@@ -104,8 +105,8 @@ TEST(Window, parallel) {
 
   barrier.wait();
 
-  for (size_t i = 0; i < ps.size(); i++) {
-    ts[i].join();
+  for (auto& t : ts) {
+    t.join();
   }
 
   EXPECT_TRUE(f.isReady());
@@ -139,8 +140,8 @@ TEST(Window, parallelWithError) {
 
   barrier.wait();
 
-  for (size_t i = 0; i < ps.size(); i++) {
-    ts[i].join();
+  for (auto& t : ts) {
+    t.join();
   }
 
   EXPECT_TRUE(f.isReady());
@@ -172,8 +173,8 @@ TEST(Window, allParallelWithError) {
 
   barrier.wait();
 
-  for (size_t i = 0; i < ps.size(); i++) {
-    ts[i].join();
+  for (auto& t : ts) {
+    t.join();
   }
 
   EXPECT_TRUE(f.isReady());
@@ -186,3 +187,173 @@ TEST(Window, allParallelWithError) {
     }
   }
 }
+
+TEST(WindowExecutor, basic) {
+  ManualExecutor executor;
+
+  // int -> Future<int>
+  auto fn = [executor_ = &executor](
+                std::vector<int> input, size_t window_size, size_t expect) {
+    auto res = reduce(
+        window(
+            executor_, input, [](int i) { return makeFuture(i); }, window_size),
+        0,
+        [](int sum, const Try<int>& b) { return sum + *b; });
+    executor_->waitFor(res);
+    EXPECT_EQ(expect, res.get());
+  };
+  {
+    SCOPED_TRACE("2 in-flight at a time");
+    std::vector<int> input = {1, 2, 3};
+    fn(input, 2, 6);
+  }
+  {
+    SCOPED_TRACE("4 in-flight at a time");
+    std::vector<int> input = {1, 2, 3};
+    fn(input, 4, 6);
+  }
+  {
+    SCOPED_TRACE("empty input");
+    std::vector<int> input;
+    fn(input, 1, 0);
+  }
+  {
+    // int -> Future<Unit>
+    auto res = reduce(
+        window(
+            &executor,
+            std::vector<int>({1, 2, 3}),
+            [](int /* i */) { return makeFuture(); },
+            2),
+        0,
+        [](int sum, const Try<Unit>& b) {
+          EXPECT_TRUE(b.hasValue());
+          return sum + 1;
+        });
+    executor.waitFor(res);
+    EXPECT_EQ(3, res.get());
+  }
+  {
+    // string -> return Future<int>
+    auto res = reduce(
+        window(
+            &executor,
+            std::vector<std::string>{"1", "2", "3"},
+            [](std::string s) { return makeFuture<int>(folly::to<int>(s)); },
+            2),
+        0,
+        [](int sum, const Try<int>& b) { return sum + *b; });
+    executor.waitFor(res);
+    EXPECT_EQ(6, res.get());
+  }
+}
+
+TEST(WindowExecutor, parallel) {
+  ManualExecutor executor;
+
+  std::vector<int> input;
+  std::vector<Promise<int>> ps(10);
+  for (size_t i = 0; i < ps.size(); i++) {
+    input.emplace_back(i);
+  }
+  auto f = collect(
+      window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3));
+
+  std::vector<std::thread> ts;
+  boost::barrier barrier(ps.size() + 1);
+  for (size_t i = 0; i < ps.size(); i++) {
+    ts.emplace_back([&ps, &barrier, i]() {
+      barrier.wait();
+      ps[i].setValue(i);
+    });
+  }
+
+  barrier.wait();
+
+  for (auto& t : ts) {
+    t.join();
+  }
+
+  executor.waitFor(f);
+  EXPECT_TRUE(f.isReady());
+  for (size_t i = 0; i < ps.size(); i++) {
+    EXPECT_EQ(i, f.value()[i]);
+  }
+}
+
+TEST(WindowExecutor, parallelWithError) {
+  ManualExecutor executor;
+
+  std::vector<int> input;
+  std::vector<Promise<int>> ps(10);
+  for (size_t i = 0; i < ps.size(); i++) {
+    input.emplace_back(i);
+  }
+  auto f = collect(
+      window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3));
+
+  std::vector<std::thread> ts;
+  boost::barrier barrier(ps.size() + 1);
+  for (size_t i = 0; i < ps.size(); i++) {
+    ts.emplace_back([&ps, &barrier, i]() {
+      barrier.wait();
+      if (i == (ps.size() / 2)) {
+        ps[i].setException(eggs);
+      } else {
+        ps[i].setValue(i);
+      }
+    });
+  }
+
+  barrier.wait();
+
+  for (auto& t : ts) {
+    t.join();
+  }
+
+  executor.waitFor(f);
+  EXPECT_TRUE(f.isReady());
+  EXPECT_THROW(f.value(), eggs_t);
+}
+
+TEST(WindowExecutor, allParallelWithError) {
+  ManualExecutor executor;
+
+  std::vector<int> input;
+  std::vector<Promise<int>> ps(10);
+  for (size_t i = 0; i < ps.size(); i++) {
+    input.emplace_back(i);
+  }
+  auto f = collectAll(
+      window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3));
+
+  std::vector<std::thread> ts;
+  boost::barrier barrier(ps.size() + 1);
+  for (size_t i = 0; i < ps.size(); i++) {
+    ts.emplace_back([&ps, &barrier, i]() {
+      barrier.wait();
+      if (i == (ps.size() / 2)) {
+        ps[i].setException(eggs);
+      } else {
+        ps[i].setValue(i);
+      }
+    });
+  }
+
+  barrier.wait();
+
+  for (auto& t : ts) {
+    t.join();
+  }
+
+  executor.waitFor(f);
+  EXPECT_TRUE(f.isReady());
+  for (size_t i = 0; i < ps.size(); i++) {
+    if (i == (ps.size() / 2)) {
+      EXPECT_THROW(f.value()[i].value(), eggs_t);
+    } else {
+      EXPECT_TRUE(f.value()[i].hasValue());
+      EXPECT_EQ(i, f.value()[i].value());
+    }
+  }
+}