(Wangle) unorderedReduce
[folly.git] / folly / futures / test / FutureTest.cpp
index f252925346ea3ccdb7de58ee9cdf0da1555a3a17..bff61b0c0b4c1ef78eb81e24f7296cc47351f73f 100644 (file)
@@ -29,6 +29,7 @@
 #include <folly/futures/ManualExecutor.h>
 #include <folly/futures/DrivableExecutor.h>
 #include <folly/dynamic.h>
+#include <folly/Baton.h>
 #include <folly/MPMCQueue.h>
 
 #include <folly/io/async/EventBase.h>
@@ -535,12 +536,12 @@ TEST(Future, makeFuture) {
   EXPECT_EQ(42, makeFuture<float>(42).value());
 
   auto fun = [] { return 42; };
-  EXPECT_TYPE(makeFutureTry(fun), Future<int>);
-  EXPECT_EQ(42, makeFutureTry(fun).value());
+  EXPECT_TYPE(makeFutureWith(fun), Future<int>);
+  EXPECT_EQ(42, makeFutureWith(fun).value());
 
   auto failfun = []() -> int { throw eggs; };
-  EXPECT_TYPE(makeFutureTry(failfun), Future<int>);
-  EXPECT_THROW(makeFutureTry(failfun).value(), eggs_t);
+  EXPECT_TYPE(makeFutureWith(failfun), Future<int>);
+  EXPECT_THROW(makeFutureWith(failfun).value(), eggs_t);
 
   EXPECT_TYPE(makeFuture(), Future<void>);
 }
@@ -689,6 +690,36 @@ TEST(Future, unwrap) {
   EXPECT_EQ(7, f.value());
 }
 
+TEST(Future, stream) {
+  auto fn = [](vector<int> input, size_t window_size, size_t expect) {
+    auto res = reduce(
+      window(
+        input,
+        [](int i) { return makeFuture(i); },
+        2),
+      0,
+      [](int sum, const Try<int>& b) {
+        return sum + *b;
+      }).get();
+    EXPECT_EQ(expect, res);
+  };
+  {
+    // streaming 2 at a time
+    vector<int> input = {1, 2, 3};
+    fn(input, 2, 6);
+  }
+  {
+    // streaming 4 at a time
+    vector<int> input = {1, 2, 3};
+    fn(input, 4, 6);
+  }
+  {
+    // empty inpt
+    vector<int> input;
+    fn(input, 1, 0);
+  }
+}
+
 TEST(Future, collectAll) {
   // returns a vector variant
   {
@@ -698,7 +729,7 @@ TEST(Future, collectAll) {
     for (auto& p : promises)
       futures.push_back(p.getFuture());
 
-    auto allf = collectAll(futures.begin(), futures.end());
+    auto allf = collectAll(futures);
 
     random_shuffle(promises.begin(), promises.end());
     for (auto& p : promises) {
@@ -721,7 +752,7 @@ TEST(Future, collectAll) {
     for (auto& p : promises)
       futures.push_back(p.getFuture());
 
-    auto allf = collectAll(futures.begin(), futures.end());
+    auto allf = collectAll(futures);
 
 
     promises[0].setValue(42);
@@ -753,7 +784,7 @@ TEST(Future, collectAll) {
     for (auto& p : promises)
       futures.push_back(p.getFuture());
 
-    auto allf = collectAll(futures.begin(), futures.end())
+    auto allf = collectAll(futures)
       .then([](Try<vector<Try<void>>>&& ts) {
         for (auto& f : ts.value())
           f.value();
@@ -775,7 +806,7 @@ TEST(Future, collect) {
     for (auto& p : promises)
       futures.push_back(p.getFuture());
 
-    auto allf = collect(futures.begin(), futures.end());
+    auto allf = collect(futures);
 
     random_shuffle(promises.begin(), promises.end());
     for (auto& p : promises) {
@@ -797,7 +828,7 @@ TEST(Future, collect) {
     for (auto& p : promises)
       futures.push_back(p.getFuture());
 
-    auto allf = collect(futures.begin(), futures.end());
+    auto allf = collect(futures);
 
     random_shuffle(promises.begin(), promises.end());
     for (int i = 0; i < 10; i++) {
@@ -832,7 +863,7 @@ TEST(Future, collect) {
     for (auto& p : promises)
       futures.push_back(p.getFuture());
 
-    auto allf = collect(futures.begin(), futures.end());
+    auto allf = collect(futures);
 
     random_shuffle(promises.begin(), promises.end());
     for (auto& p : promises) {
@@ -851,7 +882,7 @@ TEST(Future, collect) {
     for (auto& p : promises)
       futures.push_back(p.getFuture());
 
-    auto allf = collect(futures.begin(), futures.end());
+    auto allf = collect(futures);
 
     random_shuffle(promises.begin(), promises.end());
     for (int i = 0; i < 10; i++) {
@@ -886,7 +917,7 @@ TEST(Future, collect) {
     for (auto& p : promises)
       futures.push_back(p.getFuture());
 
-    collect(futures.begin(), futures.end());
+    collect(futures);
   }
 
 }
@@ -909,7 +940,7 @@ TEST(Future, collectNotDefaultConstructible) {
   for (auto& p : promises)
     futures.push_back(p.getFuture());
 
-  auto allf = collect(futures.begin(), futures.end());
+  auto allf = collect(futures);
 
   for (auto i : indices) {
     EXPECT_FALSE(allf.isReady());
@@ -936,7 +967,7 @@ TEST(Future, collectAny) {
       EXPECT_FALSE(f.isReady());
     }
 
-    auto anyf = collectAny(futures.begin(), futures.end());
+    auto anyf = collectAny(futures);
 
     /* futures were moved in, so these are invalid now */
     EXPECT_FALSE(anyf.isReady());
@@ -964,7 +995,7 @@ TEST(Future, collectAny) {
       EXPECT_FALSE(f.isReady());
     }
 
-    auto anyf = collectAny(futures.begin(), futures.end());
+    auto anyf = collectAny(futures);
 
     EXPECT_FALSE(anyf.isReady());
 
@@ -981,7 +1012,7 @@ TEST(Future, collectAny) {
     for (auto& p : promises)
       futures.push_back(p.getFuture());
 
-    auto anyf = collectAny(futures.begin(), futures.end())
+    auto anyf = collectAny(futures)
       .then([](pair<size_t, Try<int>> p) {
         EXPECT_EQ(42, p.second.value());
       });
@@ -998,7 +1029,7 @@ TEST(when, already_completed) {
     for (int i = 0; i < 10; i++)
       fs.push_back(makeFuture());
 
-    collectAll(fs.begin(), fs.end())
+    collectAll(fs)
       .then([&](vector<Try<void>> ts) {
         EXPECT_EQ(fs.size(), ts.size());
       });
@@ -1008,7 +1039,7 @@ TEST(when, already_completed) {
     for (int i = 0; i < 10; i++)
       fs.push_back(makeFuture(i));
 
-    collectAny(fs.begin(), fs.end())
+    collectAny(fs)
       .then([&](pair<size_t, Try<int>> p) {
         EXPECT_EQ(p.first, p.second.value());
       });
@@ -1024,7 +1055,7 @@ TEST(when, collectN) {
 
   bool flag = false;
   size_t n = 3;
-  collectN(futures.begin(), futures.end(), n)
+  collectN(futures, n)
     .then([&](vector<pair<size_t, Try<void>>> v) {
       flag = true;
       EXPECT_EQ(n, v.size());
@@ -1055,7 +1086,7 @@ TEST(when, small_vector) {
     for (int i = 0; i < 10; i++)
       futures.push_back(makeFuture());
 
-    auto anyf = collectAny(futures.begin(), futures.end());
+    auto anyf = collectAny(futures);
   }
 
   {
@@ -1064,7 +1095,7 @@ TEST(when, small_vector) {
     for (int i = 0; i < 10; i++)
       futures.push_back(makeFuture());
 
-    auto allf = collectAll(futures.begin(), futures.end());
+    auto allf = collectAll(futures);
   }
 }
 
@@ -1110,7 +1141,7 @@ TEST(Future, collectAllVariadicReferences) {
 
 TEST(Future, collectAll_none) {
   vector<Future<int>> fs;
-  auto f = collectAll(fs.begin(), fs.end());
+  auto f = collectAll(fs);
   EXPECT_TRUE(f.isReady());
 }
 
@@ -1155,13 +1186,13 @@ TEST(Future, waitImmediate) {
   vector<Future<void>> v_f;
   v_f.push_back(makeFuture());
   v_f.push_back(makeFuture());
-  auto done_v_f = collectAll(v_f.begin(), v_f.end()).wait().value();
+  auto done_v_f = collectAll(v_f).wait().value();
   EXPECT_EQ(2, done_v_f.size());
 
   vector<Future<bool>> v_fb;
   v_fb.push_back(makeFuture(true));
   v_fb.push_back(makeFuture(false));
-  auto fut = collectAll(v_fb.begin(), v_fb.end());
+  auto fut = collectAll(v_fb);
   auto done_v_fb = std::move(fut.wait().value());
   EXPECT_EQ(2, done_v_fb.size());
 }
@@ -1261,7 +1292,7 @@ TEST(Future, waitWithDuration) {
   vector<Future<bool>> v_fb;
   v_fb.push_back(makeFuture(true));
   v_fb.push_back(makeFuture(false));
-  auto f = collectAll(v_fb.begin(), v_fb.end());
+  auto f = collectAll(v_fb);
   f.wait(milliseconds(1));
   EXPECT_TRUE(f.isReady());
   EXPECT_EQ(2, f.value().size());
@@ -1272,7 +1303,7 @@ TEST(Future, waitWithDuration) {
   Promise<bool> p2;
   v_fb.push_back(p1.getFuture());
   v_fb.push_back(p2.getFuture());
-  auto f = collectAll(v_fb.begin(), v_fb.end());
+  auto f = collectAll(v_fb);
   f.wait(milliseconds(1));
   EXPECT_FALSE(f.isReady());
   p1.setValue(true);
@@ -1484,7 +1515,7 @@ TEST(Future, t5506504) {
       for (auto& p : *promises) p.setValue();
     });
 
-    return collectAll(futures.begin(), futures.end());
+    return collectAll(futures);
   };
 
   fn().wait();
@@ -1690,7 +1721,7 @@ TEST(Reduce, Basic) {
   {
     auto fs = makeFutures(0);
 
-    Future<double> f1 = reduce(fs.begin(), fs.end(), 1.2,
+    Future<double> f1 = reduce(fs, 1.2,
       [](double a, Try<int>&& b){
         return a + *b + 0.1;
       });
@@ -1701,7 +1732,7 @@ TEST(Reduce, Basic) {
   {
     auto fs = makeFutures(1);
 
-    Future<double> f1 = reduce(fs.begin(), fs.end(), 0.0,
+    Future<double> f1 = reduce(fs, 0.0,
       [](double a, Try<int>&& b){
         return a + *b + 0.1;
       });
@@ -1712,7 +1743,7 @@ TEST(Reduce, Basic) {
   {
     auto fs = makeFutures(3);
 
-    Future<double> f1 = reduce(fs.begin(), fs.end(), 0.0,
+    Future<double> f1 = reduce(fs, 0.0,
       [](double a, Try<int>&& b){
         return a + *b + 0.1;
       });
@@ -1723,7 +1754,7 @@ TEST(Reduce, Basic) {
   {
     auto fs = makeFutures(3);
 
-    Future<double> f1 = reduce(fs.begin(), fs.end(), 0.0,
+    Future<double> f1 = reduce(fs, 0.0,
       [](double a, int&& b){
         return a + b + 0.1;
       });
@@ -1734,7 +1765,7 @@ TEST(Reduce, Basic) {
   {
     auto fs = makeFutures(3);
 
-    Future<double> f2 = reduce(fs.begin(), fs.end(), 0.0,
+    Future<double> f2 = reduce(fs, 0.0,
       [](double a, Try<int>&& b){
         return makeFuture<double>(a + *b + 0.1);
       });
@@ -1745,7 +1776,7 @@ TEST(Reduce, Basic) {
   {
     auto fs = makeFutures(3);
 
-    Future<double> f2 = reduce(fs.begin(), fs.end(), 0.0,
+    Future<double> f2 = reduce(fs, 0.0,
       [](double a, int&& b){
         return makeFuture<double>(a + b + 0.1);
       });
@@ -1753,6 +1784,83 @@ TEST(Reduce, Basic) {
   }
 }
 
+TEST(Reduce, Chain) {
+  auto makeFutures = [](int count) {
+    std::vector<Future<int>> fs;
+    for (int i = 1; i <= count; ++i) {
+      fs.emplace_back(makeFuture(i));
+    }
+    return fs;
+  };
+
+  {
+    auto f = collectAll(makeFutures(3)).reduce(0, [](int a, Try<int>&& b){
+      return a + *b;
+    });
+    EXPECT_EQ(6, f.get());
+  }
+  {
+    auto f = collect(makeFutures(3)).reduce(0, [](int a, int&& b){
+      return a + b;
+    });
+    EXPECT_EQ(6, f.get());
+  }
+}
+
+TEST(Reduce, Streaming) {
+  {
+    std::vector<Future<int>> fs;
+    fs.push_back(makeFuture(1));
+    fs.push_back(makeFuture(2));
+    fs.push_back(makeFuture(3));
+
+    Future<double> f = unorderedReduce(fs.begin(), fs.end(), 0.0,
+      [](double a, int&& b){
+        return double(b);
+      });
+    EXPECT_EQ(3.0, f.get());
+  }
+  {
+    Promise<int> p1;
+    Promise<int> p2;
+    Promise<int> p3;
+
+    std::vector<Future<int>> fs;
+    fs.push_back(p1.getFuture());
+    fs.push_back(p2.getFuture());
+    fs.push_back(p3.getFuture());
+
+    Future<double> f = unorderedReduce(fs.begin(), fs.end(), 0.0,
+      [](double a, int&& b){
+        return double(b);
+      });
+    p3.setValue(3);
+    p2.setValue(2);
+    p1.setValue(1);
+    EXPECT_EQ(1.0, f.get());
+  }
+}
+
+TEST(Reduce, StreamingException) {
+  Promise<int> p1;
+  Promise<int> p2;
+  Promise<int> p3;
+
+  std::vector<Future<int>> fs;
+  fs.push_back(p1.getFuture());
+  fs.push_back(p2.getFuture());
+  fs.push_back(p3.getFuture());
+
+  Future<double> f = unorderedReduce(fs.begin(), fs.end(), 0.0,
+    [](double a, int&& b){
+      return b + 0.0;
+    });
+  p3.setValue(3);
+  p2.setException(exception_wrapper(std::runtime_error("blah")));
+  p1.setValue(1);
+  EXPECT_THROW(f.get(), std::runtime_error);
+}
+
 TEST(Map, Basic) {
   Promise<int> p1;
   Promise<int> p2;
@@ -1764,7 +1872,7 @@ TEST(Map, Basic) {
   fs.push_back(p3.getFuture());
 
   int c = 0;
-  auto fs2 = futures::map(fs.begin(), fs.end(), [&](int i){
+  std::vector<Future<void>> fs2 = futures::map(fs, [&](int i){
     c += i;
   });
 
@@ -1776,5 +1884,5 @@ TEST(Map, Basic) {
   p1.setValue(1);
   EXPECT_EQ(3, c);
 
-  EXPECT_TRUE(collect(fs2.begin(), fs2.end()).isReady());
+  EXPECT_TRUE(collect(fs2).isReady());
 }