move InlineExecutor, ManualExecutor, and GlobalThreadPoolList to
[folly.git] / folly / futures / test / WindowTest.cpp
index 6eecedc199ef544d90155c475b04f447acb91ecf..90158d4749ce36e1968e664346c38e4e9fc8cf48 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * limitations under the License.
  */
 
-#include <gtest/gtest.h>
-
 #include <boost/thread/barrier.hpp>
 
 #include <folly/Conv.h>
+#include <folly/executors/ManualExecutor.h>
 #include <folly/futures/Future.h>
+#include <folly/portability/GTest.h>
 
 #include <vector>
 
@@ -43,17 +43,17 @@ TEST(Window, basic) {
     EXPECT_EQ(expect, res);
   };
   {
-    // 2 in-flight at a time
+    SCOPED_TRACE("2 in-flight at a time");
     std::vector<int> input = {1, 2, 3};
     fn(input, 2, 6);
   }
   {
-    // 4 in-flight at a time
+    SCOPED_TRACE("4 in-flight at a time");
     std::vector<int> input = {1, 2, 3};
     fn(input, 4, 6);
   }
   {
-    // empty input
+    SCOPED_TRACE("empty input");
     std::vector<int> input;
     fn(input, 1, 0);
   }
@@ -105,8 +105,8 @@ TEST(Window, parallel) {
 
   barrier.wait();
 
-  for (size_t i = 0; i < ps.size(); i++) {
-    ts[i].join();
+  for (auto& t : ts) {
+    t.join();
   }
 
   EXPECT_TRUE(f.isReady());
@@ -140,8 +140,8 @@ TEST(Window, parallelWithError) {
 
   barrier.wait();
 
-  for (size_t i = 0; i < ps.size(); i++) {
-    ts[i].join();
+  for (auto& t : ts) {
+    t.join();
   }
 
   EXPECT_TRUE(f.isReady());
@@ -173,8 +173,8 @@ TEST(Window, allParallelWithError) {
 
   barrier.wait();
 
-  for (size_t i = 0; i < ps.size(); i++) {
-    ts[i].join();
+  for (auto& t : ts) {
+    t.join();
   }
 
   EXPECT_TRUE(f.isReady());
@@ -187,3 +187,173 @@ TEST(Window, allParallelWithError) {
     }
   }
 }
+
+TEST(WindowExecutor, basic) {
+  ManualExecutor executor;
+
+  // int -> Future<int>
+  auto fn = [executor_ = &executor](
+                std::vector<int> input, size_t window_size, size_t expect) {
+    auto res = reduce(
+        window(
+            executor_, input, [](int i) { return makeFuture(i); }, window_size),
+        0,
+        [](int sum, const Try<int>& b) { return sum + *b; });
+    executor_->waitFor(res);
+    EXPECT_EQ(expect, res.get());
+  };
+  {
+    SCOPED_TRACE("2 in-flight at a time");
+    std::vector<int> input = {1, 2, 3};
+    fn(input, 2, 6);
+  }
+  {
+    SCOPED_TRACE("4 in-flight at a time");
+    std::vector<int> input = {1, 2, 3};
+    fn(input, 4, 6);
+  }
+  {
+    SCOPED_TRACE("empty input");
+    std::vector<int> input;
+    fn(input, 1, 0);
+  }
+  {
+    // int -> Future<Unit>
+    auto res = reduce(
+        window(
+            &executor,
+            std::vector<int>({1, 2, 3}),
+            [](int /* i */) { return makeFuture(); },
+            2),
+        0,
+        [](int sum, const Try<Unit>& b) {
+          EXPECT_TRUE(b.hasValue());
+          return sum + 1;
+        });
+    executor.waitFor(res);
+    EXPECT_EQ(3, res.get());
+  }
+  {
+    // string -> return Future<int>
+    auto res = reduce(
+        window(
+            &executor,
+            std::vector<std::string>{"1", "2", "3"},
+            [](std::string s) { return makeFuture<int>(folly::to<int>(s)); },
+            2),
+        0,
+        [](int sum, const Try<int>& b) { return sum + *b; });
+    executor.waitFor(res);
+    EXPECT_EQ(6, res.get());
+  }
+}
+
+TEST(WindowExecutor, parallel) {
+  ManualExecutor executor;
+
+  std::vector<int> input;
+  std::vector<Promise<int>> ps(10);
+  for (size_t i = 0; i < ps.size(); i++) {
+    input.emplace_back(i);
+  }
+  auto f = collect(
+      window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3));
+
+  std::vector<std::thread> ts;
+  boost::barrier barrier(ps.size() + 1);
+  for (size_t i = 0; i < ps.size(); i++) {
+    ts.emplace_back([&ps, &barrier, i]() {
+      barrier.wait();
+      ps[i].setValue(i);
+    });
+  }
+
+  barrier.wait();
+
+  for (auto& t : ts) {
+    t.join();
+  }
+
+  executor.waitFor(f);
+  EXPECT_TRUE(f.isReady());
+  for (size_t i = 0; i < ps.size(); i++) {
+    EXPECT_EQ(i, f.value()[i]);
+  }
+}
+
+TEST(WindowExecutor, parallelWithError) {
+  ManualExecutor executor;
+
+  std::vector<int> input;
+  std::vector<Promise<int>> ps(10);
+  for (size_t i = 0; i < ps.size(); i++) {
+    input.emplace_back(i);
+  }
+  auto f = collect(
+      window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3));
+
+  std::vector<std::thread> ts;
+  boost::barrier barrier(ps.size() + 1);
+  for (size_t i = 0; i < ps.size(); i++) {
+    ts.emplace_back([&ps, &barrier, i]() {
+      barrier.wait();
+      if (i == (ps.size() / 2)) {
+        ps[i].setException(eggs);
+      } else {
+        ps[i].setValue(i);
+      }
+    });
+  }
+
+  barrier.wait();
+
+  for (auto& t : ts) {
+    t.join();
+  }
+
+  executor.waitFor(f);
+  EXPECT_TRUE(f.isReady());
+  EXPECT_THROW(f.value(), eggs_t);
+}
+
+TEST(WindowExecutor, allParallelWithError) {
+  ManualExecutor executor;
+
+  std::vector<int> input;
+  std::vector<Promise<int>> ps(10);
+  for (size_t i = 0; i < ps.size(); i++) {
+    input.emplace_back(i);
+  }
+  auto f = collectAll(
+      window(&executor, input, [&](int i) { return ps[i].getFuture(); }, 3));
+
+  std::vector<std::thread> ts;
+  boost::barrier barrier(ps.size() + 1);
+  for (size_t i = 0; i < ps.size(); i++) {
+    ts.emplace_back([&ps, &barrier, i]() {
+      barrier.wait();
+      if (i == (ps.size() / 2)) {
+        ps[i].setException(eggs);
+      } else {
+        ps[i].setValue(i);
+      }
+    });
+  }
+
+  barrier.wait();
+
+  for (auto& t : ts) {
+    t.join();
+  }
+
+  executor.waitFor(f);
+  EXPECT_TRUE(f.isReady());
+  for (size_t i = 0; i < ps.size(); i++) {
+    if (i == (ps.size() / 2)) {
+      EXPECT_THROW(f.value()[i].value(), eggs_t);
+    } else {
+      EXPECT_TRUE(f.value()[i].hasValue());
+      EXPECT_EQ(i, f.value()[i].value());
+    }
+  }
+}