(Wangle) unorderedReduce
[folly.git] / folly / futures / test / FutureTest.cpp
index cbb944727cd3f2c7782bae3f35c1c2b0123628c8..bff61b0c0b4c1ef78eb81e24f7296cc47351f73f 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2014 Facebook, Inc.
+ * Copyright 2015 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 #include <folly/Executor.h>
 #include <folly/futures/Future.h>
 #include <folly/futures/ManualExecutor.h>
+#include <folly/futures/DrivableExecutor.h>
+#include <folly/dynamic.h>
+#include <folly/Baton.h>
 #include <folly/MPMCQueue.h>
 
+#include <folly/io/async/EventBase.h>
 #include <folly/io/async/Request.h>
 
 using namespace folly;
@@ -36,6 +40,7 @@ using std::pair;
 using std::string;
 using std::unique_ptr;
 using std::vector;
+using std::chrono::milliseconds;
 
 #define EXPECT_TYPE(x, T) \
   EXPECT_TRUE((std::is_same<decltype(x), T>::value))
@@ -60,7 +65,9 @@ class ThreadExecutor : public Executor {
 
  public:
   explicit ThreadExecutor(size_t n = 1024)
-    : funcs(n), worker(std::bind(&ThreadExecutor::work, this)) {}
+    : funcs(n) {
+    worker = std::thread(std::bind(&ThreadExecutor::work, this));
+  }
 
   ~ThreadExecutor() {
     done = true;
@@ -77,9 +84,17 @@ class ThreadExecutor : public Executor {
   }
 };
 
-typedef WangleException eggs_t;
+typedef FutureException eggs_t;
 static eggs_t eggs("eggs");
 
+// Core
+
+TEST(Future, coreSize) {
+  // If this number goes down, it's fine!
+  // If it goes up, please seek professional advice ;-)
+  EXPECT_EQ(192, sizeof(detail::Core<void>));
+}
+
 // Future
 
 TEST(Future, onError) {
@@ -246,6 +261,66 @@ TEST(Future, onError) {
       .onError([&] (eggs_t& e) { throw e; return makeFuture<int>(-1); });
     EXPECT_THROW(f.value(), eggs_t);
   }
+
+  // exception_wrapper, return Future<T>
+  {
+    auto f = makeFuture()
+      .then([] { throw eggs; })
+      .onError([&] (exception_wrapper e) { flag(); return makeFuture(); });
+    EXPECT_FLAG();
+    EXPECT_NO_THROW(f.value());
+  }
+
+  // exception_wrapper, return Future<T> but throw
+  {
+    auto f = makeFuture()
+      .then([]{ throw eggs; return 0; })
+      .onError([&] (exception_wrapper e) {
+        flag();
+        throw eggs;
+        return makeFuture<int>(-1);
+      });
+    EXPECT_FLAG();
+    EXPECT_THROW(f.value(), eggs_t);
+  }
+
+  // exception_wrapper, return T
+  {
+    auto f = makeFuture()
+      .then([]{ throw eggs; return 0; })
+      .onError([&] (exception_wrapper e) {
+        flag();
+        return -1;
+      });
+    EXPECT_FLAG();
+    EXPECT_EQ(-1, f.value());
+  }
+
+  // exception_wrapper, return T but throw
+  {
+    auto f = makeFuture()
+      .then([]{ throw eggs; return 0; })
+      .onError([&] (exception_wrapper e) {
+        flag();
+        throw eggs;
+        return -1;
+      });
+    EXPECT_FLAG();
+    EXPECT_THROW(f.value(), eggs_t);
+  }
+
+  // const exception_wrapper&
+  {
+    auto f = makeFuture()
+      .then([] { throw eggs; })
+      .onError([&] (const exception_wrapper& e) {
+        flag();
+        return makeFuture();
+      });
+    EXPECT_FLAG();
+    EXPECT_NO_THROW(f.value());
+  }
+
 }
 
 TEST(Future, try) {
@@ -275,6 +350,25 @@ TEST(Future, special) {
   EXPECT_TRUE(std::is_move_assignable<Future<int>>::value);
 }
 
+TEST(Future, then) {
+  auto f = makeFuture<string>("0")
+    .then([](){ return makeFuture<string>("1"); })
+    .then([](Try<string>&& t) { return makeFuture(t.value() + ";2"); })
+    .then([](const Try<string>&& t) { return makeFuture(t.value() + ";3"); })
+    .then([](Try<string>& t) { return makeFuture(t.value() + ";4"); })
+    .then([](const Try<string>& t) { return makeFuture(t.value() + ";5"); })
+    .then([](Try<string> t) { return makeFuture(t.value() + ";6"); })
+    .then([](const Try<string> t) { return makeFuture(t.value() + ";7"); })
+    .then([](string&& s) { return makeFuture(s + ";8"); })
+    .then([](const string&& s) { return makeFuture(s + ";9"); })
+    .then([](string& s) { return makeFuture(s + ";10"); })
+    .then([](const string& s) { return makeFuture(s + ";11"); })
+    .then([](string s) { return makeFuture(s + ";12"); })
+    .then([](const string s) { return makeFuture(s + ";13"); })
+  ;
+  EXPECT_EQ(f.value(), "1;2;3;4;5;6;7;8;9;10;11;12;13");
+}
+
 TEST(Future, thenTry) {
   bool flag = false;
 
@@ -356,7 +450,7 @@ TEST(Future, thenFunction) {
   auto f = makeFuture<string>("start")
     .then(doWorkStatic)
     .then(Worker::doWorkStatic)
-    .then(&w, &Worker::doWork);
+    .then(&Worker::doWork, &w);
 
   EXPECT_EQ(f.value(), "start;static;class-static;class");
 }
@@ -378,11 +472,30 @@ TEST(Future, thenFunctionFuture) {
   auto f = makeFuture<string>("start")
     .then(doWorkStaticFuture)
     .then(Worker::doWorkStaticFuture)
-    .then(&w, &Worker::doWorkFuture);
+    .then(&Worker::doWorkFuture, &w);
 
   EXPECT_EQ(f.value(), "start;static;class-static;class");
 }
 
+TEST(Future, thenBind) {
+  auto l = []() {
+    return makeFuture("bind");
+  };
+  auto b = std::bind(l);
+  auto f = makeFuture().then(std::move(b));
+  EXPECT_EQ(f.value(), "bind");
+}
+
+TEST(Future, thenBindTry) {
+  auto l = [](Try<string>&& t) {
+    return makeFuture(t.value() + ";bind");
+  };
+  auto b = std::bind(l, std::placeholders::_1);
+  auto f = makeFuture<string>("start").then(std::move(b));
+
+  EXPECT_EQ(f.value(), "start;bind");
+}
+
 TEST(Future, value) {
   auto f = makeFuture(unique_ptr<int>(new int(42)));
   auto up = std::move(f.value());
@@ -423,12 +536,12 @@ TEST(Future, makeFuture) {
   EXPECT_EQ(42, makeFuture<float>(42).value());
 
   auto fun = [] { return 42; };
-  EXPECT_TYPE(makeFutureTry(fun), Future<int>);
-  EXPECT_EQ(42, makeFutureTry(fun).value());
+  EXPECT_TYPE(makeFutureWith(fun), Future<int>);
+  EXPECT_EQ(42, makeFutureWith(fun).value());
 
   auto failfun = []() -> int { throw eggs; };
-  EXPECT_TYPE(makeFutureTry(failfun), Future<int>);
-  EXPECT_THROW(makeFutureTry(failfun).value(), eggs_t);
+  EXPECT_TYPE(makeFutureWith(failfun), Future<int>);
+  EXPECT_THROW(makeFutureWith(failfun).value(), eggs_t);
 
   EXPECT_TYPE(makeFuture(), Future<void>);
 }
@@ -505,17 +618,17 @@ TEST(Promise, setException) {
   }
 }
 
-TEST(Promise, fulfil) {
+TEST(Promise, setWith) {
   {
     Promise<int> p;
     auto f = p.getFuture();
-    p.fulfil([] { return 42; });
+    p.setWith([] { return 42; });
     EXPECT_EQ(42, f.value());
   }
   {
     Promise<int> p;
     auto f = p.getFuture();
-    p.fulfil([]() -> int { throw eggs; });
+    p.setWith([]() -> int { throw eggs; });
     EXPECT_THROW(f.value(), eggs_t);
   }
 }
@@ -577,7 +690,37 @@ TEST(Future, unwrap) {
   EXPECT_EQ(7, f.value());
 }
 
-TEST(Future, whenAll) {
+TEST(Future, stream) {
+  auto fn = [](vector<int> input, size_t window_size, size_t expect) {
+    auto res = reduce(
+      window(
+        input,
+        [](int i) { return makeFuture(i); },
+        2),
+      0,
+      [](int sum, const Try<int>& b) {
+        return sum + *b;
+      }).get();
+    EXPECT_EQ(expect, res);
+  };
+  {
+    // streaming 2 at a time
+    vector<int> input = {1, 2, 3};
+    fn(input, 2, 6);
+  }
+  {
+    // streaming 4 at a time
+    vector<int> input = {1, 2, 3};
+    fn(input, 4, 6);
+  }
+  {
+    // empty inpt
+    vector<int> input;
+    fn(input, 1, 0);
+  }
+}
+
+TEST(Future, collectAll) {
   // returns a vector variant
   {
     vector<Promise<int>> promises(10);
@@ -586,7 +729,7 @@ TEST(Future, whenAll) {
     for (auto& p : promises)
       futures.push_back(p.getFuture());
 
-    auto allf = whenAll(futures.begin(), futures.end());
+    auto allf = collectAll(futures);
 
     random_shuffle(promises.begin(), promises.end());
     for (auto& p : promises) {
@@ -609,7 +752,7 @@ TEST(Future, whenAll) {
     for (auto& p : promises)
       futures.push_back(p.getFuture());
 
-    auto allf = whenAll(futures.begin(), futures.end());
+    auto allf = collectAll(futures);
 
 
     promises[0].setValue(42);
@@ -641,7 +784,7 @@ TEST(Future, whenAll) {
     for (auto& p : promises)
       futures.push_back(p.getFuture());
 
-    auto allf = whenAll(futures.begin(), futures.end())
+    auto allf = collectAll(futures)
       .then([](Try<vector<Try<void>>>&& ts) {
         for (auto& f : ts.value())
           f.value();
@@ -654,8 +797,165 @@ TEST(Future, whenAll) {
   }
 }
 
+TEST(Future, collect) {
+  // success case
+  {
+    vector<Promise<int>> promises(10);
+    vector<Future<int>> futures;
+
+    for (auto& p : promises)
+      futures.push_back(p.getFuture());
 
-TEST(Future, whenAny) {
+    auto allf = collect(futures);
+
+    random_shuffle(promises.begin(), promises.end());
+    for (auto& p : promises) {
+      EXPECT_FALSE(allf.isReady());
+      p.setValue(42);
+    }
+
+    EXPECT_TRUE(allf.isReady());
+    for (auto i : allf.value()) {
+      EXPECT_EQ(42, i);
+    }
+  }
+
+  // failure case
+  {
+    vector<Promise<int>> promises(10);
+    vector<Future<int>> futures;
+
+    for (auto& p : promises)
+      futures.push_back(p.getFuture());
+
+    auto allf = collect(futures);
+
+    random_shuffle(promises.begin(), promises.end());
+    for (int i = 0; i < 10; i++) {
+      if (i < 5) {
+        // everthing goes well so far...
+        EXPECT_FALSE(allf.isReady());
+        promises[i].setValue(42);
+      } else if (i == 5) {
+        // short circuit with an exception
+        EXPECT_FALSE(allf.isReady());
+        promises[i].setException(eggs);
+        EXPECT_TRUE(allf.isReady());
+      } else if (i < 8) {
+        // don't blow up on further values
+        EXPECT_TRUE(allf.isReady());
+        promises[i].setValue(42);
+      } else {
+        // don't blow up on further exceptions
+        EXPECT_TRUE(allf.isReady());
+        promises[i].setException(eggs);
+      }
+    }
+
+    EXPECT_THROW(allf.value(), eggs_t);
+  }
+
+  // void futures success case
+  {
+    vector<Promise<void>> promises(10);
+    vector<Future<void>> futures;
+
+    for (auto& p : promises)
+      futures.push_back(p.getFuture());
+
+    auto allf = collect(futures);
+
+    random_shuffle(promises.begin(), promises.end());
+    for (auto& p : promises) {
+      EXPECT_FALSE(allf.isReady());
+      p.setValue();
+    }
+
+    EXPECT_TRUE(allf.isReady());
+  }
+
+  // void futures failure case
+  {
+    vector<Promise<void>> promises(10);
+    vector<Future<void>> futures;
+
+    for (auto& p : promises)
+      futures.push_back(p.getFuture());
+
+    auto allf = collect(futures);
+
+    random_shuffle(promises.begin(), promises.end());
+    for (int i = 0; i < 10; i++) {
+      if (i < 5) {
+        // everthing goes well so far...
+        EXPECT_FALSE(allf.isReady());
+        promises[i].setValue();
+      } else if (i == 5) {
+        // short circuit with an exception
+        EXPECT_FALSE(allf.isReady());
+        promises[i].setException(eggs);
+        EXPECT_TRUE(allf.isReady());
+      } else if (i < 8) {
+        // don't blow up on further values
+        EXPECT_TRUE(allf.isReady());
+        promises[i].setValue();
+      } else {
+        // don't blow up on further exceptions
+        EXPECT_TRUE(allf.isReady());
+        promises[i].setException(eggs);
+      }
+    }
+
+    EXPECT_THROW(allf.value(), eggs_t);
+  }
+
+  // move only compiles
+  {
+    vector<Promise<unique_ptr<int>>> promises(10);
+    vector<Future<unique_ptr<int>>> futures;
+
+    for (auto& p : promises)
+      futures.push_back(p.getFuture());
+
+    collect(futures);
+  }
+
+}
+
+struct NotDefaultConstructible {
+  NotDefaultConstructible() = delete;
+  NotDefaultConstructible(int arg) : i(arg) {}
+  int i;
+};
+
+// We have a specialized implementation for non-default-constructible objects
+// Ensure that it works and preserves order
+TEST(Future, collectNotDefaultConstructible) {
+  vector<Promise<NotDefaultConstructible>> promises(10);
+  vector<Future<NotDefaultConstructible>> futures;
+  vector<int> indices(10);
+  std::iota(indices.begin(), indices.end(), 0);
+  random_shuffle(indices.begin(), indices.end());
+
+  for (auto& p : promises)
+    futures.push_back(p.getFuture());
+
+  auto allf = collect(futures);
+
+  for (auto i : indices) {
+    EXPECT_FALSE(allf.isReady());
+    promises[i].setValue(NotDefaultConstructible(i));
+  }
+
+  EXPECT_TRUE(allf.isReady());
+  int i = 0;
+  for (auto val : allf.value()) {
+    EXPECT_EQ(i, val.i);
+    i++;
+  }
+}
+
+TEST(Future, collectAny) {
   {
     vector<Promise<int>> promises(10);
     vector<Future<int>> futures;
@@ -667,7 +967,7 @@ TEST(Future, whenAny) {
       EXPECT_FALSE(f.isReady());
     }
 
-    auto anyf = whenAny(futures.begin(), futures.end());
+    auto anyf = collectAny(futures);
 
     /* futures were moved in, so these are invalid now */
     EXPECT_FALSE(anyf.isReady());
@@ -695,7 +995,7 @@ TEST(Future, whenAny) {
       EXPECT_FALSE(f.isReady());
     }
 
-    auto anyf = whenAny(futures.begin(), futures.end());
+    auto anyf = collectAny(futures);
 
     EXPECT_FALSE(anyf.isReady());
 
@@ -712,9 +1012,9 @@ TEST(Future, whenAny) {
     for (auto& p : promises)
       futures.push_back(p.getFuture());
 
-    auto anyf = whenAny(futures.begin(), futures.end())
-      .then([](Try<pair<size_t, Try<int>>>&& f) {
-        EXPECT_EQ(42, f.value().second.value());
+    auto anyf = collectAny(futures)
+      .then([](pair<size_t, Try<int>> p) {
+        EXPECT_EQ(42, p.second.value());
       });
 
     promises[3].setValue(42);
@@ -729,9 +1029,9 @@ TEST(when, already_completed) {
     for (int i = 0; i < 10; i++)
       fs.push_back(makeFuture());
 
-    whenAll(fs.begin(), fs.end())
-      .then([&](Try<vector<Try<void>>>&& t) {
-        EXPECT_EQ(fs.size(), t.value().size());
+    collectAll(fs)
+      .then([&](vector<Try<void>> ts) {
+        EXPECT_EQ(fs.size(), ts.size());
       });
   }
   {
@@ -739,15 +1039,14 @@ TEST(when, already_completed) {
     for (int i = 0; i < 10; i++)
       fs.push_back(makeFuture(i));
 
-    whenAny(fs.begin(), fs.end())
-      .then([&](Try<pair<size_t, Try<int>>>&& t) {
-        auto& p = t.value();
+    collectAny(fs)
+      .then([&](pair<size_t, Try<int>> p) {
         EXPECT_EQ(p.first, p.second.value());
       });
   }
 }
 
-TEST(when, whenN) {
+TEST(when, collectN) {
   vector<Promise<void>> promises(10);
   vector<Future<void>> futures;
 
@@ -756,10 +1055,9 @@ TEST(when, whenN) {
 
   bool flag = false;
   size_t n = 3;
-  whenN(futures.begin(), futures.end(), n)
-    .then([&](Try<vector<pair<size_t, Try<void>>>>&& t) {
+  collectN(futures, n)
+    .then([&](vector<pair<size_t, Try<void>>> v) {
       flag = true;
-      auto v = t.value();
       EXPECT_EQ(n, v.size());
       for (auto& tt : v)
         EXPECT_TRUE(tt.second.hasValue());
@@ -788,7 +1086,7 @@ TEST(when, small_vector) {
     for (int i = 0; i < 10; i++)
       futures.push_back(makeFuture());
 
-    auto anyf = whenAny(futures.begin(), futures.end());
+    auto anyf = collectAny(futures);
   }
 
   {
@@ -797,24 +1095,23 @@ TEST(when, small_vector) {
     for (int i = 0; i < 10; i++)
       futures.push_back(makeFuture());
 
-    auto allf = whenAll(futures.begin(), futures.end());
+    auto allf = collectAll(futures);
   }
 }
 
-TEST(Future, whenAllVariadic) {
+TEST(Future, collectAllVariadic) {
   Promise<bool> pb;
   Promise<int> pi;
   Future<bool> fb = pb.getFuture();
   Future<int> fi = pi.getFuture();
   bool flag = false;
-  whenAll(std::move(fb), std::move(fi))
-    .then([&](Try<std::tuple<Try<bool>, Try<int>>>&& t) {
+  collectAll(std::move(fb), std::move(fi))
+    .then([&](std::tuple<Try<bool>, Try<int>> tup) {
       flag = true;
-      EXPECT_TRUE(t.hasValue());
-      EXPECT_TRUE(std::get<0>(t.value()).hasValue());
-      EXPECT_EQ(std::get<0>(t.value()).value(), true);
-      EXPECT_TRUE(std::get<1>(t.value()).hasValue());
-      EXPECT_EQ(std::get<1>(t.value()).value(), 42);
+      EXPECT_TRUE(std::get<0>(tup).hasValue());
+      EXPECT_EQ(std::get<0>(tup).value(), true);
+      EXPECT_TRUE(std::get<1>(tup).hasValue());
+      EXPECT_EQ(std::get<1>(tup).value(), 42);
     });
   pb.setValue(true);
   EXPECT_FALSE(flag);
@@ -822,20 +1119,19 @@ TEST(Future, whenAllVariadic) {
   EXPECT_TRUE(flag);
 }
 
-TEST(Future, whenAllVariadicReferences) {
+TEST(Future, collectAllVariadicReferences) {
   Promise<bool> pb;
   Promise<int> pi;
   Future<bool> fb = pb.getFuture();
   Future<int> fi = pi.getFuture();
   bool flag = false;
-  whenAll(fb, fi)
-    .then([&](Try<std::tuple<Try<bool>, Try<int>>>&& t) {
+  collectAll(fb, fi)
+    .then([&](std::tuple<Try<bool>, Try<int>> tup) {
       flag = true;
-      EXPECT_TRUE(t.hasValue());
-      EXPECT_TRUE(std::get<0>(t.value()).hasValue());
-      EXPECT_EQ(std::get<0>(t.value()).value(), true);
-      EXPECT_TRUE(std::get<1>(t.value()).hasValue());
-      EXPECT_EQ(std::get<1>(t.value()).value(), 42);
+      EXPECT_TRUE(std::get<0>(tup).hasValue());
+      EXPECT_EQ(std::get<0>(tup).value(), true);
+      EXPECT_TRUE(std::get<1>(tup).hasValue());
+      EXPECT_EQ(std::get<1>(tup).value(), 42);
     });
   pb.setValue(true);
   EXPECT_FALSE(flag);
@@ -843,9 +1139,9 @@ TEST(Future, whenAllVariadicReferences) {
   EXPECT_TRUE(flag);
 }
 
-TEST(Future, whenAll_none) {
+TEST(Future, collectAll_none) {
   vector<Future<int>> fs;
-  auto f = whenAll(fs.begin(), fs.end());
+  auto f = collectAll(fs);
   EXPECT_TRUE(f.isReady());
 }
 
@@ -877,31 +1173,31 @@ TEST(Future, throwIfFailed) {
     });
 }
 
-TEST(Future, waitWithSemaphoreImmediate) {
-  waitWithSemaphore(makeFuture());
-  auto done = waitWithSemaphore(makeFuture(42)).value();
+TEST(Future, waitImmediate) {
+  makeFuture().wait();
+  auto done = makeFuture(42).wait().value();
   EXPECT_EQ(42, done);
 
   vector<int> v{1,2,3};
-  auto done_v = waitWithSemaphore(makeFuture(v)).value();
+  auto done_v = makeFuture(v).wait().value();
   EXPECT_EQ(v.size(), done_v.size());
   EXPECT_EQ(v, done_v);
 
   vector<Future<void>> v_f;
   v_f.push_back(makeFuture());
   v_f.push_back(makeFuture());
-  auto done_v_f = waitWithSemaphore(whenAll(v_f.begin(), v_f.end())).value();
+  auto done_v_f = collectAll(v_f).wait().value();
   EXPECT_EQ(2, done_v_f.size());
 
   vector<Future<bool>> v_fb;
   v_fb.push_back(makeFuture(true));
   v_fb.push_back(makeFuture(false));
-  auto fut = whenAll(v_fb.begin(), v_fb.end());
-  auto done_v_fb = std::move(waitWithSemaphore(std::move(fut)).value());
+  auto fut = collectAll(v_fb);
+  auto done_v_fb = std::move(fut.wait().value());
   EXPECT_EQ(2, done_v_fb.size());
 }
 
-TEST(Future, waitWithSemaphore) {
+TEST(Future, wait) {
   Promise<int> p;
   Future<int> f = p.getFuture();
   std::atomic<bool> flag{false};
@@ -914,7 +1210,7 @@ TEST(Future, waitWithSemaphore) {
           return t.value();
         });
       flag = true;
-      result.store(waitWithSemaphore(std::move(n)).value());
+      result.store(n.wait().value());
     },
     std::move(f)
     );
@@ -928,33 +1224,78 @@ TEST(Future, waitWithSemaphore) {
   EXPECT_EQ(result.load(), 42);
 }
 
-TEST(Future, waitWithSemaphoreForTime) {
+struct MoveFlag {
+  MoveFlag() = default;
+  MoveFlag(const MoveFlag&) = delete;
+  MoveFlag(MoveFlag&& other) noexcept {
+    other.moved = true;
+  }
+  bool moved{false};
+};
+
+TEST(Future, waitReplacesSelf) {
+  // wait
+  {
+    // lvalue
+    auto f1 = makeFuture(MoveFlag());
+    f1.wait();
+    EXPECT_FALSE(f1.value().moved);
+
+    // rvalue
+    auto f2 = makeFuture(MoveFlag()).wait();
+    EXPECT_FALSE(f2.value().moved);
+  }
+
+  // wait(Duration)
+  {
+    // lvalue
+    auto f1 = makeFuture(MoveFlag());
+    f1.wait(milliseconds(1));
+    EXPECT_FALSE(f1.value().moved);
+
+    // rvalue
+    auto f2 = makeFuture(MoveFlag()).wait(milliseconds(1));
+    EXPECT_FALSE(f2.value().moved);
+  }
+
+  // waitVia
+  {
+    folly::EventBase eb;
+    // lvalue
+    auto f1 = makeFuture(MoveFlag());
+    f1.waitVia(&eb);
+    EXPECT_FALSE(f1.value().moved);
+
+    // rvalue
+    auto f2 = makeFuture(MoveFlag()).waitVia(&eb);
+    EXPECT_FALSE(f2.value().moved);
+  }
+}
+
+TEST(Future, waitWithDuration) {
  {
   Promise<int> p;
   Future<int> f = p.getFuture();
-  auto t = waitWithSemaphore(std::move(f),
-    std::chrono::microseconds(1));
-  EXPECT_FALSE(t.isReady());
+  f.wait(milliseconds(1));
+  EXPECT_FALSE(f.isReady());
   p.setValue(1);
-  EXPECT_TRUE(t.isReady());
+  EXPECT_TRUE(f.isReady());
  }
  {
   Promise<int> p;
   Future<int> f = p.getFuture();
   p.setValue(1);
-  auto t = waitWithSemaphore(std::move(f),
-    std::chrono::milliseconds(1));
-  EXPECT_TRUE(t.isReady());
+  f.wait(milliseconds(1));
+  EXPECT_TRUE(f.isReady());
  }
  {
   vector<Future<bool>> v_fb;
   v_fb.push_back(makeFuture(true));
   v_fb.push_back(makeFuture(false));
-  auto f = whenAll(v_fb.begin(), v_fb.end());
-  auto t = waitWithSemaphore(std::move(f),
-    std::chrono::milliseconds(1));
-  EXPECT_TRUE(t.isReady());
-  EXPECT_EQ(2, t.value().size());
+  auto f = collectAll(v_fb);
+  f.wait(milliseconds(1));
+  EXPECT_TRUE(f.isReady());
+  EXPECT_EQ(2, f.value().size());
  }
  {
   vector<Future<bool>> v_fb;
@@ -962,67 +1303,98 @@ TEST(Future, waitWithSemaphoreForTime) {
   Promise<bool> p2;
   v_fb.push_back(p1.getFuture());
   v_fb.push_back(p2.getFuture());
-  auto f = whenAll(v_fb.begin(), v_fb.end());
-  auto t = waitWithSemaphore(std::move(f),
-    std::chrono::milliseconds(1));
-  EXPECT_FALSE(t.isReady());
+  auto f = collectAll(v_fb);
+  f.wait(milliseconds(1));
+  EXPECT_FALSE(f.isReady());
   p1.setValue(true);
-  EXPECT_FALSE(t.isReady());
+  EXPECT_FALSE(f.isReady());
   p2.setValue(true);
-  EXPECT_TRUE(t.isReady());
+  EXPECT_TRUE(f.isReady());
  }
  {
-  auto t = waitWithSemaphore(makeFuture(),
-    std::chrono::milliseconds(1));
-  EXPECT_TRUE(t.isReady());
+  auto f = makeFuture().wait(milliseconds(1));
+  EXPECT_TRUE(f.isReady());
  }
-}
-
-TEST(Future, callbackAfterActivate) {
-  Promise<void> p;
-  auto f = p.getFuture();
-  f.deactivate();
 
-  size_t count = 0;
-  f.then([&](Try<void>&&) { count++; });
-
-  p.setValue();
-  EXPECT_EQ(0, count);
+ {
+   Promise<void> p;
+   auto start = std::chrono::steady_clock::now();
+   auto f = p.getFuture().wait(milliseconds(100));
+   auto elapsed = std::chrono::steady_clock::now() - start;
+   EXPECT_GE(elapsed, milliseconds(100));
+   EXPECT_FALSE(f.isReady());
+   p.setValue();
+   EXPECT_TRUE(f.isReady());
+ }
 
-  f.activate();
-  EXPECT_EQ(1, count);
+ {
+   // Try to trigger the race where the resultant Future is not yet complete
+   // even if we didn't hit the timeout, and make sure we deal with it properly
+   Promise<void> p;
+   folly::Baton<> b;
+   auto t = std::thread([&]{
+     b.post();
+     /* sleep override */ std::this_thread::sleep_for(milliseconds(100));
+     p.setValue();
+   });
+   b.wait();
+   auto f = p.getFuture().wait(std::chrono::seconds(3600));
+   EXPECT_TRUE(f.isReady());
+   t.join();
+ }
 }
 
-TEST(Future, activateOnDestruct) {
-  auto f = std::make_shared<Future<void>>(makeFuture());
-  f->deactivate();
-
-  size_t count = 0;
-  f->then([&](Try<void>&&) { count++; });
-  EXPECT_EQ(0, count);
-
-  f.reset();
-  EXPECT_EQ(1, count);
-}
+class DummyDrivableExecutor : public DrivableExecutor {
+ public:
+  void add(Func f) override {}
+  void drive() override { ran = true; }
+  bool ran{false};
+};
 
-TEST(Future, viaActsCold) {
-  ManualExecutor x;
-  size_t count = 0;
+TEST(Future, getVia) {
+  {
+    // non-void
+    ManualExecutor x;
+    auto f = via(&x).then([]{ return true; });
+    EXPECT_TRUE(f.getVia(&x));
+  }
 
-  auto fv = via(&x);
-  fv.then([&](Try<void>&&) { count++; });
+  {
+    // void
+    ManualExecutor x;
+    auto f = via(&x).then();
+    f.getVia(&x);
+  }
 
-  EXPECT_EQ(0, count);
+  {
+    DummyDrivableExecutor x;
+    auto f = makeFuture(true);
+    EXPECT_TRUE(f.getVia(&x));
+    EXPECT_FALSE(x.ran);
+  }
+}
 
-  fv.activate();
+TEST(Future, waitVia) {
+  {
+    ManualExecutor x;
+    auto f = via(&x).then();
+    EXPECT_FALSE(f.isReady());
+    f.waitVia(&x);
+    EXPECT_TRUE(f.isReady());
+  }
 
-  EXPECT_EQ(1, x.run());
-  EXPECT_EQ(1, count);
-}
+  {
+    // try rvalue as well
+    ManualExecutor x;
+    auto f = via(&x).then().waitVia(&x);
+    EXPECT_TRUE(f.isReady());
+  }
 
-TEST(Future, viaIsCold) {
-  ManualExecutor x;
-  EXPECT_FALSE(via(&x).isActive());
+  {
+    DummyDrivableExecutor x;
+    makeFuture(true).waitVia(&x);
+    EXPECT_FALSE(x.ran);
+  }
 }
 
 TEST(Future, viaRaces) {
@@ -1048,35 +1420,6 @@ TEST(Future, viaRaces) {
   t2.join();
 }
 
-// TODO(#4920689)
-TEST(Future, viaRaces_2stage) {
-  ManualExecutor x;
-  Promise<void> p;
-  auto tid = std::this_thread::get_id();
-  bool done = false;
-
-  std::thread t1([&] {
-    auto f2 = p.getFuture().via(&x);
-    f2.then([&](Try<void>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
-      .then([&](Try<void>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
-      .then([&](Try<void>&&) { done = true; });
-
-    // the bug was in the promise being fulfilled before f2 is reactivated. we
-    // could sleep, but yielding should cause this to fail with reasonable
-    // probability
-    std::this_thread::yield();
-    f2.activate();
-  });
-
-  std::thread t2([&] {
-    p.setValue();
-  });
-
-  while (!done) x.run();
-  t1.join();
-  t2.join();
-}
-
 TEST(Future, getFuture_after_setValue) {
   Promise<int> p;
   p.setValue(42);
@@ -1085,7 +1428,7 @@ TEST(Future, getFuture_after_setValue) {
 
 TEST(Future, getFuture_after_setException) {
   Promise<void> p;
-  p.fulfil([]() -> void { throw std::logic_error("foo"); });
+  p.setWith([]() -> void { throw std::logic_error("foo"); });
   EXPECT_THROW(p.getFuture().value(), std::logic_error);
 }
 
@@ -1147,7 +1490,7 @@ TEST(Future, context) {
 
   EXPECT_EQ(nullptr, RequestContext::get()->getContextData("test"));
 
-  // Fulfil the promise
+  // Fulfill the promise
   p.setValue();
 }
 
@@ -1172,10 +1515,10 @@ TEST(Future, t5506504) {
       for (auto& p : *promises) p.setValue();
     });
 
-    return whenAll(futures.begin(), futures.end());
+    return collectAll(futures);
   };
 
-  waitWithSemaphore(fn());
+  fn().wait();
 }
 
 // Test of handling of a circular dependency. It's never recommended
@@ -1199,5 +1542,347 @@ TEST(Future, CircularDependencySharedPtrSelfReset) {
 
   ptr.reset();
 
-  promise.fulfil([]{return 1l;});
+  promise.setWith([]{return 1l;});
+}
+
+TEST(Future, Constructor) {
+  auto f1 = []() -> Future<int> { return Future<int>(3); }();
+  EXPECT_EQ(f1.value(), 3);
+  auto f2 = []() -> Future<void> { return Future<void>(); }();
+  EXPECT_NO_THROW(f2.value());
+}
+
+TEST(Future, ImplicitConstructor) {
+  auto f1 = []() -> Future<int> { return 3; }();
+  EXPECT_EQ(f1.value(), 3);
+  // Unfortunately, the C++ standard does not allow the
+  // following implicit conversion to work:
+  //auto f2 = []() -> Future<void> { }();
+}
+
+TEST(Future, thenDynamic) {
+  // folly::dynamic has a constructor that takes any T, this test makes
+  // sure that we call the then lambda with folly::dynamic and not
+  // Try<folly::dynamic> because that then fails to compile
+  Promise<folly::dynamic> p;
+  Future<folly::dynamic> f = p.getFuture().then(
+      [](const folly::dynamic& d) {
+        return folly::dynamic(d.asInt() + 3);
+      }
+  );
+  p.setValue(2);
+  EXPECT_EQ(f.get(), 5);
+}
+
+TEST(Future, via_then_get_was_racy) {
+  ThreadExecutor x;
+  std::unique_ptr<int> val = folly::via(&x)
+    .then([] { return folly::make_unique<int>(42); })
+    .get();
+  ASSERT_TRUE(!!val);
+  EXPECT_EQ(42, *val);
+}
+
+TEST(Future, ensure) {
+  size_t count = 0;
+  auto cob = [&]{ count++; };
+  auto f = makeFuture(42)
+    .ensure(cob)
+    .then([](int) { throw std::runtime_error("ensure"); })
+    .ensure(cob);
+
+  EXPECT_THROW(f.get(), std::runtime_error);
+  EXPECT_EQ(2, count);
+}
+
+TEST(Future, willEqual) {
+    //both p1 and p2 already fulfilled
+    {
+    Promise<int> p1;
+    Promise<int> p2;
+    p1.setValue(27);
+    p2.setValue(27);
+    auto f1 = p1.getFuture();
+    auto f2 = p2.getFuture();
+    EXPECT_TRUE(f1.willEqual(f2).get());
+    }{
+    Promise<int> p1;
+    Promise<int> p2;
+    p1.setValue(27);
+    p2.setValue(36);
+    auto f1 = p1.getFuture();
+    auto f2 = p2.getFuture();
+    EXPECT_FALSE(f1.willEqual(f2).get());
+    }
+    //both p1 and p2 not yet fulfilled
+    {
+    Promise<int> p1;
+    Promise<int> p2;
+    auto f1 = p1.getFuture();
+    auto f2 = p2.getFuture();
+    auto f3 = f1.willEqual(f2);
+    p1.setValue(27);
+    p2.setValue(27);
+    EXPECT_TRUE(f3.get());
+    }{
+    Promise<int> p1;
+    Promise<int> p2;
+    auto f1 = p1.getFuture();
+    auto f2 = p2.getFuture();
+    auto f3 = f1.willEqual(f2);
+    p1.setValue(27);
+    p2.setValue(36);
+    EXPECT_FALSE(f3.get());
+    }
+    //p1 already fulfilled, p2 not yet fulfilled
+    {
+    Promise<int> p1;
+    Promise<int> p2;
+    p1.setValue(27);
+    auto f1 = p1.getFuture();
+    auto f2 = p2.getFuture();
+    auto f3 = f1.willEqual(f2);
+    p2.setValue(27);
+    EXPECT_TRUE(f3.get());
+    }{
+    Promise<int> p1;
+    Promise<int> p2;
+    p1.setValue(27);
+    auto f1 = p1.getFuture();
+    auto f2 = p2.getFuture();
+    auto f3 = f1.willEqual(f2);
+    p2.setValue(36);
+    EXPECT_FALSE(f3.get());
+    }
+    //p2 already fulfilled, p1 not yet fulfilled
+    {
+    Promise<int> p1;
+    Promise<int> p2;
+    p2.setValue(27);
+    auto f1 = p1.getFuture();
+    auto f2 = p2.getFuture();
+    auto f3 = f1.willEqual(f2);
+    p1.setValue(27);
+    EXPECT_TRUE(f3.get());
+    }{
+    Promise<int> p1;
+    Promise<int> p2;
+    p2.setValue(36);
+    auto f1 = p1.getFuture();
+    auto f2 = p2.getFuture();
+    auto f3 = f1.willEqual(f2);
+    p1.setValue(27);
+    EXPECT_FALSE(f3.get());
+    }
+}
+
+// Unwrap tests.
+
+// A simple scenario for the unwrap call, when the promise was fulfilled
+// before calling to unwrap.
+TEST(Future, Unwrap_SimpleScenario) {
+  Future<int> encapsulated_future = makeFuture(5484);
+  Future<Future<int>> future = makeFuture(std::move(encapsulated_future));
+  EXPECT_EQ(5484, future.unwrap().value());
+}
+
+// Makes sure that unwrap() works when chaning Future's commands.
+TEST(Future, Unwrap_ChainCommands) {
+  Future<Future<int>> future = makeFuture(makeFuture(5484));
+  auto unwrapped = future.unwrap().then([](int i){ return i; });
+  EXPECT_EQ(5484, unwrapped.value());
+}
+
+// Makes sure that the unwrap call also works when the promise was not yet
+// fulfilled, and that the returned Future<T> becomes ready once the promise
+// is fulfilled.
+TEST(Future, Unwrap_FutureNotReady) {
+  Promise<Future<int>> p;
+  Future<Future<int>> future = p.getFuture();
+  Future<int> unwrapped = future.unwrap();
+  // Sanity - should not be ready before the promise is fulfilled.
+  ASSERT_FALSE(unwrapped.isReady());
+  // Fulfill the promise and make sure the unwrapped future is now ready.
+  p.setValue(makeFuture(5484));
+  ASSERT_TRUE(unwrapped.isReady());
+  EXPECT_EQ(5484, unwrapped.value());
+}
+
+TEST(Reduce, Basic) {
+  auto makeFutures = [](int count) {
+    std::vector<Future<int>> fs;
+    for (int i = 1; i <= count; ++i) {
+      fs.emplace_back(makeFuture(i));
+    }
+    return fs;
+  };
+
+  // Empty (Try)
+  {
+    auto fs = makeFutures(0);
+
+    Future<double> f1 = reduce(fs, 1.2,
+      [](double a, Try<int>&& b){
+        return a + *b + 0.1;
+      });
+    EXPECT_EQ(1.2, f1.get());
+  }
+
+  // One (Try)
+  {
+    auto fs = makeFutures(1);
+
+    Future<double> f1 = reduce(fs, 0.0,
+      [](double a, Try<int>&& b){
+        return a + *b + 0.1;
+      });
+    EXPECT_EQ(1.1, f1.get());
+  }
+
+  // Returning values (Try)
+  {
+    auto fs = makeFutures(3);
+
+    Future<double> f1 = reduce(fs, 0.0,
+      [](double a, Try<int>&& b){
+        return a + *b + 0.1;
+      });
+    EXPECT_EQ(6.3, f1.get());
+  }
+
+  // Returning values
+  {
+    auto fs = makeFutures(3);
+
+    Future<double> f1 = reduce(fs, 0.0,
+      [](double a, int&& b){
+        return a + b + 0.1;
+      });
+    EXPECT_EQ(6.3, f1.get());
+  }
+
+  // Returning futures (Try)
+  {
+    auto fs = makeFutures(3);
+
+    Future<double> f2 = reduce(fs, 0.0,
+      [](double a, Try<int>&& b){
+        return makeFuture<double>(a + *b + 0.1);
+      });
+    EXPECT_EQ(6.3, f2.get());
+  }
+
+  // Returning futures
+  {
+    auto fs = makeFutures(3);
+
+    Future<double> f2 = reduce(fs, 0.0,
+      [](double a, int&& b){
+        return makeFuture<double>(a + b + 0.1);
+      });
+    EXPECT_EQ(6.3, f2.get());
+  }
+}
+
+TEST(Reduce, Chain) {
+  auto makeFutures = [](int count) {
+    std::vector<Future<int>> fs;
+    for (int i = 1; i <= count; ++i) {
+      fs.emplace_back(makeFuture(i));
+    }
+    return fs;
+  };
+
+  {
+    auto f = collectAll(makeFutures(3)).reduce(0, [](int a, Try<int>&& b){
+      return a + *b;
+    });
+    EXPECT_EQ(6, f.get());
+  }
+  {
+    auto f = collect(makeFutures(3)).reduce(0, [](int a, int&& b){
+      return a + b;
+    });
+    EXPECT_EQ(6, f.get());
+  }
+}
+
+TEST(Reduce, Streaming) {
+  {
+    std::vector<Future<int>> fs;
+    fs.push_back(makeFuture(1));
+    fs.push_back(makeFuture(2));
+    fs.push_back(makeFuture(3));
+
+    Future<double> f = unorderedReduce(fs.begin(), fs.end(), 0.0,
+      [](double a, int&& b){
+        return double(b);
+      });
+    EXPECT_EQ(3.0, f.get());
+  }
+  {
+    Promise<int> p1;
+    Promise<int> p2;
+    Promise<int> p3;
+
+    std::vector<Future<int>> fs;
+    fs.push_back(p1.getFuture());
+    fs.push_back(p2.getFuture());
+    fs.push_back(p3.getFuture());
+
+    Future<double> f = unorderedReduce(fs.begin(), fs.end(), 0.0,
+      [](double a, int&& b){
+        return double(b);
+      });
+    p3.setValue(3);
+    p2.setValue(2);
+    p1.setValue(1);
+    EXPECT_EQ(1.0, f.get());
+  }
+}
+
+TEST(Reduce, StreamingException) {
+  Promise<int> p1;
+  Promise<int> p2;
+  Promise<int> p3;
+
+  std::vector<Future<int>> fs;
+  fs.push_back(p1.getFuture());
+  fs.push_back(p2.getFuture());
+  fs.push_back(p3.getFuture());
+
+  Future<double> f = unorderedReduce(fs.begin(), fs.end(), 0.0,
+    [](double a, int&& b){
+      return b + 0.0;
+    });
+  p3.setValue(3);
+  p2.setException(exception_wrapper(std::runtime_error("blah")));
+  p1.setValue(1);
+  EXPECT_THROW(f.get(), std::runtime_error);
+}
+
+TEST(Map, Basic) {
+  Promise<int> p1;
+  Promise<int> p2;
+  Promise<int> p3;
+
+  std::vector<Future<int>> fs;
+  fs.push_back(p1.getFuture());
+  fs.push_back(p2.getFuture());
+  fs.push_back(p3.getFuture());
+
+  int c = 0;
+  std::vector<Future<void>> fs2 = futures::map(fs, [&](int i){
+    c += i;
+  });
+
+  // Ensure we call the callbacks as the futures complete regardless of order
+  p2.setValue(1);
+  EXPECT_EQ(1, c);
+  p3.setValue(1);
+  EXPECT_EQ(2, c);
+  p1.setValue(1);
+  EXPECT_EQ(3, c);
+
+  EXPECT_TRUE(collect(fs2).isReady());
 }