(Wangle) unorderedReduce
[folly.git] / folly / futures / test / FutureTest.cpp
index 23c72f8ccd19aac33a864a82ddec54a6103b0534..bff61b0c0b4c1ef78eb81e24f7296cc47351f73f 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2014 Facebook, Inc.
+ * Copyright 2015 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -28,6 +28,8 @@
 #include <folly/futures/Future.h>
 #include <folly/futures/ManualExecutor.h>
 #include <folly/futures/DrivableExecutor.h>
+#include <folly/dynamic.h>
+#include <folly/Baton.h>
 #include <folly/MPMCQueue.h>
 
 #include <folly/io/async/EventBase.h>
@@ -63,7 +65,9 @@ class ThreadExecutor : public Executor {
 
  public:
   explicit ThreadExecutor(size_t n = 1024)
-    : funcs(n), worker(std::bind(&ThreadExecutor::work, this)) {}
+    : funcs(n) {
+    worker = std::thread(std::bind(&ThreadExecutor::work, this));
+  }
 
   ~ThreadExecutor() {
     done = true;
@@ -88,7 +92,7 @@ static eggs_t eggs("eggs");
 TEST(Future, coreSize) {
   // If this number goes down, it's fine!
   // If it goes up, please seek professional advice ;-)
-  EXPECT_EQ(128, sizeof(detail::Core<void>));
+  EXPECT_EQ(192, sizeof(detail::Core<void>));
 }
 
 // Future
@@ -257,6 +261,66 @@ TEST(Future, onError) {
       .onError([&] (eggs_t& e) { throw e; return makeFuture<int>(-1); });
     EXPECT_THROW(f.value(), eggs_t);
   }
+
+  // exception_wrapper, return Future<T>
+  {
+    auto f = makeFuture()
+      .then([] { throw eggs; })
+      .onError([&] (exception_wrapper e) { flag(); return makeFuture(); });
+    EXPECT_FLAG();
+    EXPECT_NO_THROW(f.value());
+  }
+
+  // exception_wrapper, return Future<T> but throw
+  {
+    auto f = makeFuture()
+      .then([]{ throw eggs; return 0; })
+      .onError([&] (exception_wrapper e) {
+        flag();
+        throw eggs;
+        return makeFuture<int>(-1);
+      });
+    EXPECT_FLAG();
+    EXPECT_THROW(f.value(), eggs_t);
+  }
+
+  // exception_wrapper, return T
+  {
+    auto f = makeFuture()
+      .then([]{ throw eggs; return 0; })
+      .onError([&] (exception_wrapper e) {
+        flag();
+        return -1;
+      });
+    EXPECT_FLAG();
+    EXPECT_EQ(-1, f.value());
+  }
+
+  // exception_wrapper, return T but throw
+  {
+    auto f = makeFuture()
+      .then([]{ throw eggs; return 0; })
+      .onError([&] (exception_wrapper e) {
+        flag();
+        throw eggs;
+        return -1;
+      });
+    EXPECT_FLAG();
+    EXPECT_THROW(f.value(), eggs_t);
+  }
+
+  // const exception_wrapper&
+  {
+    auto f = makeFuture()
+      .then([] { throw eggs; })
+      .onError([&] (const exception_wrapper& e) {
+        flag();
+        return makeFuture();
+      });
+    EXPECT_FLAG();
+    EXPECT_NO_THROW(f.value());
+  }
+
 }
 
 TEST(Future, try) {
@@ -472,12 +536,12 @@ TEST(Future, makeFuture) {
   EXPECT_EQ(42, makeFuture<float>(42).value());
 
   auto fun = [] { return 42; };
-  EXPECT_TYPE(makeFutureTry(fun), Future<int>);
-  EXPECT_EQ(42, makeFutureTry(fun).value());
+  EXPECT_TYPE(makeFutureWith(fun), Future<int>);
+  EXPECT_EQ(42, makeFutureWith(fun).value());
 
   auto failfun = []() -> int { throw eggs; };
-  EXPECT_TYPE(makeFutureTry(failfun), Future<int>);
-  EXPECT_THROW(makeFutureTry(failfun).value(), eggs_t);
+  EXPECT_TYPE(makeFutureWith(failfun), Future<int>);
+  EXPECT_THROW(makeFutureWith(failfun).value(), eggs_t);
 
   EXPECT_TYPE(makeFuture(), Future<void>);
 }
@@ -554,17 +618,17 @@ TEST(Promise, setException) {
   }
 }
 
-TEST(Promise, fulfil) {
+TEST(Promise, setWith) {
   {
     Promise<int> p;
     auto f = p.getFuture();
-    p.fulfil([] { return 42; });
+    p.setWith([] { return 42; });
     EXPECT_EQ(42, f.value());
   }
   {
     Promise<int> p;
     auto f = p.getFuture();
-    p.fulfil([]() -> int { throw eggs; });
+    p.setWith([]() -> int { throw eggs; });
     EXPECT_THROW(f.value(), eggs_t);
   }
 }
@@ -626,7 +690,37 @@ TEST(Future, unwrap) {
   EXPECT_EQ(7, f.value());
 }
 
-TEST(Future, whenAll) {
+TEST(Future, stream) {
+  auto fn = [](vector<int> input, size_t window_size, size_t expect) {
+    auto res = reduce(
+      window(
+        input,
+        [](int i) { return makeFuture(i); },
+        2),
+      0,
+      [](int sum, const Try<int>& b) {
+        return sum + *b;
+      }).get();
+    EXPECT_EQ(expect, res);
+  };
+  {
+    // streaming 2 at a time
+    vector<int> input = {1, 2, 3};
+    fn(input, 2, 6);
+  }
+  {
+    // streaming 4 at a time
+    vector<int> input = {1, 2, 3};
+    fn(input, 4, 6);
+  }
+  {
+    // empty inpt
+    vector<int> input;
+    fn(input, 1, 0);
+  }
+}
+
+TEST(Future, collectAll) {
   // returns a vector variant
   {
     vector<Promise<int>> promises(10);
@@ -635,7 +729,7 @@ TEST(Future, whenAll) {
     for (auto& p : promises)
       futures.push_back(p.getFuture());
 
-    auto allf = whenAll(futures.begin(), futures.end());
+    auto allf = collectAll(futures);
 
     random_shuffle(promises.begin(), promises.end());
     for (auto& p : promises) {
@@ -658,7 +752,7 @@ TEST(Future, whenAll) {
     for (auto& p : promises)
       futures.push_back(p.getFuture());
 
-    auto allf = whenAll(futures.begin(), futures.end());
+    auto allf = collectAll(futures);
 
 
     promises[0].setValue(42);
@@ -690,7 +784,7 @@ TEST(Future, whenAll) {
     for (auto& p : promises)
       futures.push_back(p.getFuture());
 
-    auto allf = whenAll(futures.begin(), futures.end())
+    auto allf = collectAll(futures)
       .then([](Try<vector<Try<void>>>&& ts) {
         for (auto& f : ts.value())
           f.value();
@@ -703,8 +797,165 @@ TEST(Future, whenAll) {
   }
 }
 
+TEST(Future, collect) {
+  // success case
+  {
+    vector<Promise<int>> promises(10);
+    vector<Future<int>> futures;
+
+    for (auto& p : promises)
+      futures.push_back(p.getFuture());
+
+    auto allf = collect(futures);
+
+    random_shuffle(promises.begin(), promises.end());
+    for (auto& p : promises) {
+      EXPECT_FALSE(allf.isReady());
+      p.setValue(42);
+    }
+
+    EXPECT_TRUE(allf.isReady());
+    for (auto i : allf.value()) {
+      EXPECT_EQ(42, i);
+    }
+  }
+
+  // failure case
+  {
+    vector<Promise<int>> promises(10);
+    vector<Future<int>> futures;
+
+    for (auto& p : promises)
+      futures.push_back(p.getFuture());
+
+    auto allf = collect(futures);
+
+    random_shuffle(promises.begin(), promises.end());
+    for (int i = 0; i < 10; i++) {
+      if (i < 5) {
+        // everthing goes well so far...
+        EXPECT_FALSE(allf.isReady());
+        promises[i].setValue(42);
+      } else if (i == 5) {
+        // short circuit with an exception
+        EXPECT_FALSE(allf.isReady());
+        promises[i].setException(eggs);
+        EXPECT_TRUE(allf.isReady());
+      } else if (i < 8) {
+        // don't blow up on further values
+        EXPECT_TRUE(allf.isReady());
+        promises[i].setValue(42);
+      } else {
+        // don't blow up on further exceptions
+        EXPECT_TRUE(allf.isReady());
+        promises[i].setException(eggs);
+      }
+    }
+
+    EXPECT_THROW(allf.value(), eggs_t);
+  }
+
+  // void futures success case
+  {
+    vector<Promise<void>> promises(10);
+    vector<Future<void>> futures;
+
+    for (auto& p : promises)
+      futures.push_back(p.getFuture());
+
+    auto allf = collect(futures);
+
+    random_shuffle(promises.begin(), promises.end());
+    for (auto& p : promises) {
+      EXPECT_FALSE(allf.isReady());
+      p.setValue();
+    }
+
+    EXPECT_TRUE(allf.isReady());
+  }
+
+  // void futures failure case
+  {
+    vector<Promise<void>> promises(10);
+    vector<Future<void>> futures;
+
+    for (auto& p : promises)
+      futures.push_back(p.getFuture());
+
+    auto allf = collect(futures);
+
+    random_shuffle(promises.begin(), promises.end());
+    for (int i = 0; i < 10; i++) {
+      if (i < 5) {
+        // everthing goes well so far...
+        EXPECT_FALSE(allf.isReady());
+        promises[i].setValue();
+      } else if (i == 5) {
+        // short circuit with an exception
+        EXPECT_FALSE(allf.isReady());
+        promises[i].setException(eggs);
+        EXPECT_TRUE(allf.isReady());
+      } else if (i < 8) {
+        // don't blow up on further values
+        EXPECT_TRUE(allf.isReady());
+        promises[i].setValue();
+      } else {
+        // don't blow up on further exceptions
+        EXPECT_TRUE(allf.isReady());
+        promises[i].setException(eggs);
+      }
+    }
+
+    EXPECT_THROW(allf.value(), eggs_t);
+  }
+
+  // move only compiles
+  {
+    vector<Promise<unique_ptr<int>>> promises(10);
+    vector<Future<unique_ptr<int>>> futures;
+
+    for (auto& p : promises)
+      futures.push_back(p.getFuture());
+
+    collect(futures);
+  }
+
+}
+
+struct NotDefaultConstructible {
+  NotDefaultConstructible() = delete;
+  NotDefaultConstructible(int arg) : i(arg) {}
+  int i;
+};
+
+// We have a specialized implementation for non-default-constructible objects
+// Ensure that it works and preserves order
+TEST(Future, collectNotDefaultConstructible) {
+  vector<Promise<NotDefaultConstructible>> promises(10);
+  vector<Future<NotDefaultConstructible>> futures;
+  vector<int> indices(10);
+  std::iota(indices.begin(), indices.end(), 0);
+  random_shuffle(indices.begin(), indices.end());
+
+  for (auto& p : promises)
+    futures.push_back(p.getFuture());
+
+  auto allf = collect(futures);
+
+  for (auto i : indices) {
+    EXPECT_FALSE(allf.isReady());
+    promises[i].setValue(NotDefaultConstructible(i));
+  }
+
+  EXPECT_TRUE(allf.isReady());
+  int i = 0;
+  for (auto val : allf.value()) {
+    EXPECT_EQ(i, val.i);
+    i++;
+  }
+}
 
-TEST(Future, whenAny) {
+TEST(Future, collectAny) {
   {
     vector<Promise<int>> promises(10);
     vector<Future<int>> futures;
@@ -716,7 +967,7 @@ TEST(Future, whenAny) {
       EXPECT_FALSE(f.isReady());
     }
 
-    auto anyf = whenAny(futures.begin(), futures.end());
+    auto anyf = collectAny(futures);
 
     /* futures were moved in, so these are invalid now */
     EXPECT_FALSE(anyf.isReady());
@@ -744,7 +995,7 @@ TEST(Future, whenAny) {
       EXPECT_FALSE(f.isReady());
     }
 
-    auto anyf = whenAny(futures.begin(), futures.end());
+    auto anyf = collectAny(futures);
 
     EXPECT_FALSE(anyf.isReady());
 
@@ -761,7 +1012,7 @@ TEST(Future, whenAny) {
     for (auto& p : promises)
       futures.push_back(p.getFuture());
 
-    auto anyf = whenAny(futures.begin(), futures.end())
+    auto anyf = collectAny(futures)
       .then([](pair<size_t, Try<int>> p) {
         EXPECT_EQ(42, p.second.value());
       });
@@ -778,7 +1029,7 @@ TEST(when, already_completed) {
     for (int i = 0; i < 10; i++)
       fs.push_back(makeFuture());
 
-    whenAll(fs.begin(), fs.end())
+    collectAll(fs)
       .then([&](vector<Try<void>> ts) {
         EXPECT_EQ(fs.size(), ts.size());
       });
@@ -788,14 +1039,14 @@ TEST(when, already_completed) {
     for (int i = 0; i < 10; i++)
       fs.push_back(makeFuture(i));
 
-    whenAny(fs.begin(), fs.end())
+    collectAny(fs)
       .then([&](pair<size_t, Try<int>> p) {
         EXPECT_EQ(p.first, p.second.value());
       });
   }
 }
 
-TEST(when, whenN) {
+TEST(when, collectN) {
   vector<Promise<void>> promises(10);
   vector<Future<void>> futures;
 
@@ -804,7 +1055,7 @@ TEST(when, whenN) {
 
   bool flag = false;
   size_t n = 3;
-  whenN(futures.begin(), futures.end(), n)
+  collectN(futures, n)
     .then([&](vector<pair<size_t, Try<void>>> v) {
       flag = true;
       EXPECT_EQ(n, v.size());
@@ -835,7 +1086,7 @@ TEST(when, small_vector) {
     for (int i = 0; i < 10; i++)
       futures.push_back(makeFuture());
 
-    auto anyf = whenAny(futures.begin(), futures.end());
+    auto anyf = collectAny(futures);
   }
 
   {
@@ -844,17 +1095,17 @@ TEST(when, small_vector) {
     for (int i = 0; i < 10; i++)
       futures.push_back(makeFuture());
 
-    auto allf = whenAll(futures.begin(), futures.end());
+    auto allf = collectAll(futures);
   }
 }
 
-TEST(Future, whenAllVariadic) {
+TEST(Future, collectAllVariadic) {
   Promise<bool> pb;
   Promise<int> pi;
   Future<bool> fb = pb.getFuture();
   Future<int> fi = pi.getFuture();
   bool flag = false;
-  whenAll(std::move(fb), std::move(fi))
+  collectAll(std::move(fb), std::move(fi))
     .then([&](std::tuple<Try<bool>, Try<int>> tup) {
       flag = true;
       EXPECT_TRUE(std::get<0>(tup).hasValue());
@@ -868,13 +1119,13 @@ TEST(Future, whenAllVariadic) {
   EXPECT_TRUE(flag);
 }
 
-TEST(Future, whenAllVariadicReferences) {
+TEST(Future, collectAllVariadicReferences) {
   Promise<bool> pb;
   Promise<int> pi;
   Future<bool> fb = pb.getFuture();
   Future<int> fi = pi.getFuture();
   bool flag = false;
-  whenAll(fb, fi)
+  collectAll(fb, fi)
     .then([&](std::tuple<Try<bool>, Try<int>> tup) {
       flag = true;
       EXPECT_TRUE(std::get<0>(tup).hasValue());
@@ -888,9 +1139,9 @@ TEST(Future, whenAllVariadicReferences) {
   EXPECT_TRUE(flag);
 }
 
-TEST(Future, whenAll_none) {
+TEST(Future, collectAll_none) {
   vector<Future<int>> fs;
-  auto f = whenAll(fs.begin(), fs.end());
+  auto f = collectAll(fs);
   EXPECT_TRUE(f.isReady());
 }
 
@@ -935,13 +1186,13 @@ TEST(Future, waitImmediate) {
   vector<Future<void>> v_f;
   v_f.push_back(makeFuture());
   v_f.push_back(makeFuture());
-  auto done_v_f = whenAll(v_f.begin(), v_f.end()).wait().value();
+  auto done_v_f = collectAll(v_f).wait().value();
   EXPECT_EQ(2, done_v_f.size());
 
   vector<Future<bool>> v_fb;
   v_fb.push_back(makeFuture(true));
   v_fb.push_back(makeFuture(false));
-  auto fut = whenAll(v_fb.begin(), v_fb.end());
+  auto fut = collectAll(v_fb);
   auto done_v_fb = std::move(fut.wait().value());
   EXPECT_EQ(2, done_v_fb.size());
 }
@@ -1041,7 +1292,7 @@ TEST(Future, waitWithDuration) {
   vector<Future<bool>> v_fb;
   v_fb.push_back(makeFuture(true));
   v_fb.push_back(makeFuture(false));
-  auto f = whenAll(v_fb.begin(), v_fb.end());
+  auto f = collectAll(v_fb);
   f.wait(milliseconds(1));
   EXPECT_TRUE(f.isReady());
   EXPECT_EQ(2, f.value().size());
@@ -1052,7 +1303,7 @@ TEST(Future, waitWithDuration) {
   Promise<bool> p2;
   v_fb.push_back(p1.getFuture());
   v_fb.push_back(p2.getFuture());
-  auto f = whenAll(v_fb.begin(), v_fb.end());
+  auto f = collectAll(v_fb);
   f.wait(milliseconds(1));
   EXPECT_FALSE(f.isReady());
   p1.setValue(true);
@@ -1177,7 +1428,7 @@ TEST(Future, getFuture_after_setValue) {
 
 TEST(Future, getFuture_after_setException) {
   Promise<void> p;
-  p.fulfil([]() -> void { throw std::logic_error("foo"); });
+  p.setWith([]() -> void { throw std::logic_error("foo"); });
   EXPECT_THROW(p.getFuture().value(), std::logic_error);
 }
 
@@ -1239,7 +1490,7 @@ TEST(Future, context) {
 
   EXPECT_EQ(nullptr, RequestContext::get()->getContextData("test"));
 
-  // Fulfil the promise
+  // Fulfill the promise
   p.setValue();
 }
 
@@ -1264,7 +1515,7 @@ TEST(Future, t5506504) {
       for (auto& p : *promises) p.setValue();
     });
 
-    return whenAll(futures.begin(), futures.end());
+    return collectAll(futures);
   };
 
   fn().wait();
@@ -1291,7 +1542,7 @@ TEST(Future, CircularDependencySharedPtrSelfReset) {
 
   ptr.reset();
 
-  promise.fulfil([]{return 1l;});
+  promise.setWith([]{return 1l;});
 }
 
 TEST(Future, Constructor) {
@@ -1308,3 +1559,330 @@ TEST(Future, ImplicitConstructor) {
   // following implicit conversion to work:
   //auto f2 = []() -> Future<void> { }();
 }
+
+TEST(Future, thenDynamic) {
+  // folly::dynamic has a constructor that takes any T, this test makes
+  // sure that we call the then lambda with folly::dynamic and not
+  // Try<folly::dynamic> because that then fails to compile
+  Promise<folly::dynamic> p;
+  Future<folly::dynamic> f = p.getFuture().then(
+      [](const folly::dynamic& d) {
+        return folly::dynamic(d.asInt() + 3);
+      }
+  );
+  p.setValue(2);
+  EXPECT_EQ(f.get(), 5);
+}
+
+TEST(Future, via_then_get_was_racy) {
+  ThreadExecutor x;
+  std::unique_ptr<int> val = folly::via(&x)
+    .then([] { return folly::make_unique<int>(42); })
+    .get();
+  ASSERT_TRUE(!!val);
+  EXPECT_EQ(42, *val);
+}
+
+TEST(Future, ensure) {
+  size_t count = 0;
+  auto cob = [&]{ count++; };
+  auto f = makeFuture(42)
+    .ensure(cob)
+    .then([](int) { throw std::runtime_error("ensure"); })
+    .ensure(cob);
+
+  EXPECT_THROW(f.get(), std::runtime_error);
+  EXPECT_EQ(2, count);
+}
+
+TEST(Future, willEqual) {
+    //both p1 and p2 already fulfilled
+    {
+    Promise<int> p1;
+    Promise<int> p2;
+    p1.setValue(27);
+    p2.setValue(27);
+    auto f1 = p1.getFuture();
+    auto f2 = p2.getFuture();
+    EXPECT_TRUE(f1.willEqual(f2).get());
+    }{
+    Promise<int> p1;
+    Promise<int> p2;
+    p1.setValue(27);
+    p2.setValue(36);
+    auto f1 = p1.getFuture();
+    auto f2 = p2.getFuture();
+    EXPECT_FALSE(f1.willEqual(f2).get());
+    }
+    //both p1 and p2 not yet fulfilled
+    {
+    Promise<int> p1;
+    Promise<int> p2;
+    auto f1 = p1.getFuture();
+    auto f2 = p2.getFuture();
+    auto f3 = f1.willEqual(f2);
+    p1.setValue(27);
+    p2.setValue(27);
+    EXPECT_TRUE(f3.get());
+    }{
+    Promise<int> p1;
+    Promise<int> p2;
+    auto f1 = p1.getFuture();
+    auto f2 = p2.getFuture();
+    auto f3 = f1.willEqual(f2);
+    p1.setValue(27);
+    p2.setValue(36);
+    EXPECT_FALSE(f3.get());
+    }
+    //p1 already fulfilled, p2 not yet fulfilled
+    {
+    Promise<int> p1;
+    Promise<int> p2;
+    p1.setValue(27);
+    auto f1 = p1.getFuture();
+    auto f2 = p2.getFuture();
+    auto f3 = f1.willEqual(f2);
+    p2.setValue(27);
+    EXPECT_TRUE(f3.get());
+    }{
+    Promise<int> p1;
+    Promise<int> p2;
+    p1.setValue(27);
+    auto f1 = p1.getFuture();
+    auto f2 = p2.getFuture();
+    auto f3 = f1.willEqual(f2);
+    p2.setValue(36);
+    EXPECT_FALSE(f3.get());
+    }
+    //p2 already fulfilled, p1 not yet fulfilled
+    {
+    Promise<int> p1;
+    Promise<int> p2;
+    p2.setValue(27);
+    auto f1 = p1.getFuture();
+    auto f2 = p2.getFuture();
+    auto f3 = f1.willEqual(f2);
+    p1.setValue(27);
+    EXPECT_TRUE(f3.get());
+    }{
+    Promise<int> p1;
+    Promise<int> p2;
+    p2.setValue(36);
+    auto f1 = p1.getFuture();
+    auto f2 = p2.getFuture();
+    auto f3 = f1.willEqual(f2);
+    p1.setValue(27);
+    EXPECT_FALSE(f3.get());
+    }
+}
+
+// Unwrap tests.
+
+// A simple scenario for the unwrap call, when the promise was fulfilled
+// before calling to unwrap.
+TEST(Future, Unwrap_SimpleScenario) {
+  Future<int> encapsulated_future = makeFuture(5484);
+  Future<Future<int>> future = makeFuture(std::move(encapsulated_future));
+  EXPECT_EQ(5484, future.unwrap().value());
+}
+
+// Makes sure that unwrap() works when chaning Future's commands.
+TEST(Future, Unwrap_ChainCommands) {
+  Future<Future<int>> future = makeFuture(makeFuture(5484));
+  auto unwrapped = future.unwrap().then([](int i){ return i; });
+  EXPECT_EQ(5484, unwrapped.value());
+}
+
+// Makes sure that the unwrap call also works when the promise was not yet
+// fulfilled, and that the returned Future<T> becomes ready once the promise
+// is fulfilled.
+TEST(Future, Unwrap_FutureNotReady) {
+  Promise<Future<int>> p;
+  Future<Future<int>> future = p.getFuture();
+  Future<int> unwrapped = future.unwrap();
+  // Sanity - should not be ready before the promise is fulfilled.
+  ASSERT_FALSE(unwrapped.isReady());
+  // Fulfill the promise and make sure the unwrapped future is now ready.
+  p.setValue(makeFuture(5484));
+  ASSERT_TRUE(unwrapped.isReady());
+  EXPECT_EQ(5484, unwrapped.value());
+}
+
+TEST(Reduce, Basic) {
+  auto makeFutures = [](int count) {
+    std::vector<Future<int>> fs;
+    for (int i = 1; i <= count; ++i) {
+      fs.emplace_back(makeFuture(i));
+    }
+    return fs;
+  };
+
+  // Empty (Try)
+  {
+    auto fs = makeFutures(0);
+
+    Future<double> f1 = reduce(fs, 1.2,
+      [](double a, Try<int>&& b){
+        return a + *b + 0.1;
+      });
+    EXPECT_EQ(1.2, f1.get());
+  }
+
+  // One (Try)
+  {
+    auto fs = makeFutures(1);
+
+    Future<double> f1 = reduce(fs, 0.0,
+      [](double a, Try<int>&& b){
+        return a + *b + 0.1;
+      });
+    EXPECT_EQ(1.1, f1.get());
+  }
+
+  // Returning values (Try)
+  {
+    auto fs = makeFutures(3);
+
+    Future<double> f1 = reduce(fs, 0.0,
+      [](double a, Try<int>&& b){
+        return a + *b + 0.1;
+      });
+    EXPECT_EQ(6.3, f1.get());
+  }
+
+  // Returning values
+  {
+    auto fs = makeFutures(3);
+
+    Future<double> f1 = reduce(fs, 0.0,
+      [](double a, int&& b){
+        return a + b + 0.1;
+      });
+    EXPECT_EQ(6.3, f1.get());
+  }
+
+  // Returning futures (Try)
+  {
+    auto fs = makeFutures(3);
+
+    Future<double> f2 = reduce(fs, 0.0,
+      [](double a, Try<int>&& b){
+        return makeFuture<double>(a + *b + 0.1);
+      });
+    EXPECT_EQ(6.3, f2.get());
+  }
+
+  // Returning futures
+  {
+    auto fs = makeFutures(3);
+
+    Future<double> f2 = reduce(fs, 0.0,
+      [](double a, int&& b){
+        return makeFuture<double>(a + b + 0.1);
+      });
+    EXPECT_EQ(6.3, f2.get());
+  }
+}
+
+TEST(Reduce, Chain) {
+  auto makeFutures = [](int count) {
+    std::vector<Future<int>> fs;
+    for (int i = 1; i <= count; ++i) {
+      fs.emplace_back(makeFuture(i));
+    }
+    return fs;
+  };
+
+  {
+    auto f = collectAll(makeFutures(3)).reduce(0, [](int a, Try<int>&& b){
+      return a + *b;
+    });
+    EXPECT_EQ(6, f.get());
+  }
+  {
+    auto f = collect(makeFutures(3)).reduce(0, [](int a, int&& b){
+      return a + b;
+    });
+    EXPECT_EQ(6, f.get());
+  }
+}
+
+TEST(Reduce, Streaming) {
+  {
+    std::vector<Future<int>> fs;
+    fs.push_back(makeFuture(1));
+    fs.push_back(makeFuture(2));
+    fs.push_back(makeFuture(3));
+
+    Future<double> f = unorderedReduce(fs.begin(), fs.end(), 0.0,
+      [](double a, int&& b){
+        return double(b);
+      });
+    EXPECT_EQ(3.0, f.get());
+  }
+  {
+    Promise<int> p1;
+    Promise<int> p2;
+    Promise<int> p3;
+
+    std::vector<Future<int>> fs;
+    fs.push_back(p1.getFuture());
+    fs.push_back(p2.getFuture());
+    fs.push_back(p3.getFuture());
+
+    Future<double> f = unorderedReduce(fs.begin(), fs.end(), 0.0,
+      [](double a, int&& b){
+        return double(b);
+      });
+    p3.setValue(3);
+    p2.setValue(2);
+    p1.setValue(1);
+    EXPECT_EQ(1.0, f.get());
+  }
+}
+
+TEST(Reduce, StreamingException) {
+  Promise<int> p1;
+  Promise<int> p2;
+  Promise<int> p3;
+
+  std::vector<Future<int>> fs;
+  fs.push_back(p1.getFuture());
+  fs.push_back(p2.getFuture());
+  fs.push_back(p3.getFuture());
+
+  Future<double> f = unorderedReduce(fs.begin(), fs.end(), 0.0,
+    [](double a, int&& b){
+      return b + 0.0;
+    });
+  p3.setValue(3);
+  p2.setException(exception_wrapper(std::runtime_error("blah")));
+  p1.setValue(1);
+  EXPECT_THROW(f.get(), std::runtime_error);
+}
+
+TEST(Map, Basic) {
+  Promise<int> p1;
+  Promise<int> p2;
+  Promise<int> p3;
+
+  std::vector<Future<int>> fs;
+  fs.push_back(p1.getFuture());
+  fs.push_back(p2.getFuture());
+  fs.push_back(p3.getFuture());
+
+  int c = 0;
+  std::vector<Future<void>> fs2 = futures::map(fs, [&](int i){
+    c += i;
+  });
+
+  // Ensure we call the callbacks as the futures complete regardless of order
+  p2.setValue(1);
+  EXPECT_EQ(1, c);
+  p3.setValue(1);
+  EXPECT_EQ(2, c);
+  p1.setValue(1);
+  EXPECT_EQ(3, c);
+
+  EXPECT_TRUE(collect(fs2).isReady());
+}