Fix copyright lines
[folly.git] / folly / futures / test / ViaTest.cpp
index d940e10ec49b4b9c46c90dd31fe16f06c074f63c..a2392abdf3a2404ff05f7087e565e88067f70fc0 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2014 Facebook, Inc.
+ * Copyright 2014-present Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * limitations under the License.
  */
 
-#include <gtest/gtest.h>
 #include <thread>
 
+#include <folly/MPMCQueue.h>
+#include <folly/executors/DrivableExecutor.h>
+#include <folly/executors/InlineExecutor.h>
+#include <folly/executors/ManualExecutor.h>
 #include <folly/futures/Future.h>
-#include <folly/futures/InlineExecutor.h>
-#include <folly/futures/ManualExecutor.h>
+#include <folly/portability/GTest.h>
+#include <folly/synchronization/Baton.h>
 
-using namespace folly::wangle;
+using namespace folly;
 
-struct ManualWaiter {
+struct ManualWaiter : public DrivableExecutor {
   explicit ManualWaiter(std::shared_ptr<ManualExecutor> ex) : ex(ex) {}
 
-  void makeProgress() {
+  void add(Func f) override {
+    ex->add(std::move(f));
+  }
+
+  void drive() override {
     ex->wait();
     ex->run();
   }
@@ -42,13 +49,14 @@ struct ViaFixture : public testing::Test {
     done(false)
   {
     t = std::thread([=] {
-      ManualWaiter eastWaiter(eastExecutor);
-      while (!done)
-        eastWaiter.makeProgress();
+        ManualWaiter eastWaiter(eastExecutor);
+        while (!done) {
+          eastWaiter.drive();
+        }
       });
   }
 
-  ~ViaFixture() {
+  ~ViaFixture() override {
     done = true;
     eastExecutor->add([=]() { });
     t.join();
@@ -64,16 +72,16 @@ struct ViaFixture : public testing::Test {
   std::shared_ptr<ManualExecutor> eastExecutor;
   std::shared_ptr<ManualWaiter> waiter;
   InlineExecutor inlineExecutor;
-  bool done;
+  std::atomic<bool> done;
   std::thread t;
 };
 
-TEST(Via, exception_on_launch) {
+TEST(Via, exceptionOnLaunch) {
   auto future = makeFuture<int>(std::runtime_error("E"));
   EXPECT_THROW(future.value(), std::runtime_error);
 }
 
-TEST(Via, then_value) {
+TEST(Via, thenValue) {
   auto future = makeFuture(std::move(1))
     .then([](Try<int>&& t) {
       return t.value() == 1;
@@ -83,12 +91,11 @@ TEST(Via, then_value) {
   EXPECT_TRUE(future.value());
 }
 
-TEST(Via, then_future) {
+TEST(Via, thenFuture) {
   auto future = makeFuture(1)
     .then([](Try<int>&& t) {
       return makeFuture(t.value() == 1);
-    })
-    ;
+    });
   EXPECT_TRUE(future.value());
 }
 
@@ -96,7 +103,7 @@ static Future<std::string> doWorkStatic(Try<std::string>&& t) {
   return makeFuture(t.value() + ";static");
 }
 
-TEST(Via, then_function) {
+TEST(Via, thenFunction) {
   struct Worker {
     Future<std::string> doWork(Try<std::string>&& t) {
       return makeFuture(t.value() + ";class");
@@ -109,70 +116,51 @@ TEST(Via, then_function) {
   auto f = makeFuture(std::string("start"))
     .then(doWorkStatic)
     .then(Worker::doWorkStatic)
-    .then(&w, &Worker::doWork)
+    .then(&Worker::doWork, &w)
     ;
 
   EXPECT_EQ(f.value(), "start;static;class-static;class");
 }
 
-TEST_F(ViaFixture, deactivateChain) {
-  bool flag = false;
-  auto f = makeFuture().deactivate();
-  EXPECT_FALSE(f.isActive());
-  auto f2 = f.then([&](Try<void>){ flag = true; });
-  EXPECT_FALSE(flag);
-}
-
-TEST_F(ViaFixture, deactivateActivateChain) {
-  bool flag = false;
-  // you can do this all day long with temporaries.
-  auto f1 = makeFuture().deactivate().activate().deactivate();
-  // Chaining on activate/deactivate requires an rvalue, so you have to move
-  // one of these two ways (if you're not using a temporary).
-  auto f2 = std::move(f1).activate();
-  f2.deactivate();
-  auto f3 = std::move(f2.activate());
-  f3.then([&](Try<void>){ flag = true; });
-  EXPECT_TRUE(flag);
-}
-
-TEST_F(ViaFixture, thread_hops) {
+TEST_F(ViaFixture, threadHops) {
   auto westThreadId = std::this_thread::get_id();
-  auto f = via(eastExecutor.get()).then([=](Try<void>&& t) {
-    EXPECT_NE(std::this_thread::get_id(), westThreadId);
-    return makeFuture<int>(1);
-  }).via(westExecutor.get()
-  ).then([=](Try<int>&& t) {
-    EXPECT_EQ(std::this_thread::get_id(), westThreadId);
-    return t.value();
-  });
-  while (!f.isReady()) {
-    waiter->makeProgress();
-  }
-  EXPECT_EQ(f.value(), 1);
+  auto f = via(eastExecutor.get())
+               .then([=](Try<Unit>&& /* t */) {
+                 EXPECT_NE(std::this_thread::get_id(), westThreadId);
+                 return makeFuture<int>(1);
+               })
+               .via(westExecutor.get())
+               .then([=](Try<int>&& t) {
+                 EXPECT_EQ(std::this_thread::get_id(), westThreadId);
+                 return t.value();
+               });
+  EXPECT_EQ(f.getVia(waiter.get()), 1);
 }
 
-TEST_F(ViaFixture, chain_vias) {
+TEST_F(ViaFixture, chainVias) {
   auto westThreadId = std::this_thread::get_id();
-  auto f = via(eastExecutor.get()).then([=](Try<void>&& t) {
+  auto f = via(eastExecutor.get()).then([=]() {
     EXPECT_NE(std::this_thread::get_id(), westThreadId);
-    return makeFuture<int>(1);
-  }).then([=](Try<int>&& t) {
-    int val = t.value();
-    return makeFuture(std::move(val)).via(westExecutor.get())
-      .then([=](Try<int>&& t) mutable {
+    return 1;
+  }).then([=](int val) {
+    return makeFuture(val).via(westExecutor.get())
+      .then([=](int v) mutable {
         EXPECT_EQ(std::this_thread::get_id(), westThreadId);
-        return t.value();
+        return v + 1;
       });
-  }).then([=](Try<int>&& t) {
+  }).then([=](int val) {
+    // even though ultimately the future that triggers this one executed in
+    // the west thread, this then() inherited the executor from its
+    // predecessor, ie the eastExecutor.
+    EXPECT_NE(std::this_thread::get_id(), westThreadId);
+    return val + 1;
+  }).via(westExecutor.get()).then([=](int val) {
+    // go back to west, so we can wait on it
     EXPECT_EQ(std::this_thread::get_id(), westThreadId);
-    return t.value();
+    return val + 1;
   });
 
-  while (!f.isReady()) {
-    waiter->makeProgress();
-  }
-  EXPECT_EQ(f.value(), 1);
+  EXPECT_EQ(f.getVia(waiter.get()), 4);
 }
 
 TEST_F(ViaFixture, bareViaAssignment) {
@@ -184,3 +172,440 @@ TEST_F(ViaFixture, viaAssignment) {
   // via()&
   auto f2 = f.via(eastExecutor.get());
 }
+
+TEST(Via, chain1) {
+  EXPECT_EQ(42,
+            makeFuture()
+            .thenMulti([] { return 42; })
+            .get());
+}
+
+TEST(Via, chain3) {
+  int count = 0;
+  auto f = makeFuture().thenMulti(
+      [&]{ count++; return 3.14159; },
+      [&](double) { count++; return std::string("hello"); },
+      [&]{ count++; return makeFuture(42); });
+  EXPECT_EQ(42, f.get());
+  EXPECT_EQ(3, count);
+}
+
+struct PriorityExecutor : public Executor {
+  void add(Func /* f */) override {}
+
+  void addWithPriority(Func f, int8_t priority) override {
+    int mid = getNumPriorities() / 2;
+    int p = priority < 0 ?
+            std::max(0, mid + priority) :
+            std::min(getNumPriorities() - 1, mid + priority);
+    EXPECT_LT(p, 3);
+    EXPECT_GE(p, 0);
+    if (p == 0) {
+      count0++;
+    } else if (p == 1) {
+      count1++;
+    } else if (p == 2) {
+      count2++;
+    }
+    f();
+  }
+
+  uint8_t getNumPriorities() const override {
+    return 3;
+  }
+
+  int count0{0};
+  int count1{0};
+  int count2{0};
+};
+
+TEST(Via, priority) {
+  PriorityExecutor exe;
+  via(&exe, -1).then([]{});
+  via(&exe, 0).then([]{});
+  via(&exe, 1).then([]{});
+  via(&exe, 42).then([]{});  // overflow should go to max priority
+  via(&exe, -42).then([]{}); // underflow should go to min priority
+  via(&exe).then([]{});      // default to mid priority
+  via(&exe, Executor::LO_PRI).then([]{});
+  via(&exe, Executor::HI_PRI).then([]{});
+  EXPECT_EQ(3, exe.count0);
+  EXPECT_EQ(2, exe.count1);
+  EXPECT_EQ(3, exe.count2);
+}
+
+TEST_F(ViaFixture, chainX1) {
+  EXPECT_EQ(42,
+            makeFuture()
+            .thenMultiWithExecutor(eastExecutor.get(),[] { return 42; })
+            .get());
+}
+
+TEST_F(ViaFixture, chainX3) {
+  auto westThreadId = std::this_thread::get_id();
+  int count = 0;
+  auto f = via(westExecutor.get()).thenMultiWithExecutor(
+      eastExecutor.get(),
+      [&]{
+        EXPECT_NE(std::this_thread::get_id(), westThreadId);
+        count++; return 3.14159;
+      },
+      [&](double) { count++; return std::string("hello"); },
+      [&]{ count++; })
+    .then([&](){
+        EXPECT_EQ(std::this_thread::get_id(), westThreadId);
+        return makeFuture(42);
+    });
+  EXPECT_EQ(42, f.getVia(waiter.get()));
+  EXPECT_EQ(3, count);
+}
+
+TEST(Via, then2) {
+  ManualExecutor x1, x2;
+  bool a = false, b = false, c = false;
+  via(&x1)
+    .then([&]{ a = true; })
+    .then(&x2, [&]{ b = true; })
+    .then([&]{ c = true; });
+
+  EXPECT_FALSE(a);
+  EXPECT_FALSE(b);
+
+  x1.run();
+  EXPECT_TRUE(a);
+  EXPECT_FALSE(b);
+  EXPECT_FALSE(c);
+
+  x2.run();
+  EXPECT_TRUE(b);
+  EXPECT_FALSE(c);
+
+  x1.run();
+  EXPECT_TRUE(c);
+}
+
+TEST(Via, then2Variadic) {
+  struct Foo { bool a = false; void foo(Try<Unit>) { a = true; } };
+  Foo f;
+  ManualExecutor x;
+  makeFuture().then(&x, &Foo::foo, &f);
+  EXPECT_FALSE(f.a);
+  x.run();
+  EXPECT_TRUE(f.a);
+}
+
+#ifndef __APPLE__ // TODO #7372389
+/// Simple executor that does work in another thread
+class ThreadExecutor : public Executor {
+  folly::MPMCQueue<Func> funcs;
+  std::atomic<bool> done {false};
+  std::thread worker;
+  folly::Baton<> baton;
+
+  void work() {
+    baton.post();
+    Func fn;
+    while (!done) {
+      while (!funcs.isEmpty()) {
+        funcs.blockingRead(fn);
+        fn();
+      }
+    }
+  }
+
+ public:
+  explicit ThreadExecutor(size_t n = 1024)
+    : funcs(n) {
+    worker = std::thread(std::bind(&ThreadExecutor::work, this));
+  }
+
+  ~ThreadExecutor() override {
+    done = true;
+    funcs.write([]{});
+    worker.join();
+  }
+
+  void add(Func fn) override {
+    funcs.blockingWrite(std::move(fn));
+  }
+
+  void waitForStartup() {
+    baton.wait();
+  }
+};
+
+TEST(Via, viaThenGetWasRacy) {
+  ThreadExecutor x;
+  std::unique_ptr<int> val =
+      folly::via(&x).then([] { return std::make_unique<int>(42); }).get();
+  ASSERT_TRUE(!!val);
+  EXPECT_EQ(42, *val);
+}
+
+TEST(Via, callbackRace) {
+  ThreadExecutor x;
+
+  auto fn = [&x]{
+    auto promises = std::make_shared<std::vector<Promise<Unit>>>(4);
+    std::vector<Future<Unit>> futures;
+
+    for (auto& p : *promises) {
+      futures.emplace_back(
+        p.getFuture()
+        .via(&x)
+        .then([](Try<Unit>&&){}));
+    }
+
+    x.waitForStartup();
+    x.add([promises]{
+      for (auto& p : *promises) {
+        p.setValue();
+      }
+    });
+
+    return collectAll(futures);
+  };
+
+  fn().wait();
+}
+#endif
+
+class DummyDrivableExecutor : public DrivableExecutor {
+ public:
+  void add(Func /* f */) override {}
+  void drive() override { ran = true; }
+  bool ran{false};
+};
+
+TEST(Via, getVia) {
+  {
+    // non-void
+    ManualExecutor x;
+    auto f = via(&x).then([]{ return true; });
+    EXPECT_TRUE(f.getVia(&x));
+  }
+
+  {
+    // void
+    ManualExecutor x;
+    auto f = via(&x).then();
+    f.getVia(&x);
+  }
+
+  {
+    DummyDrivableExecutor x;
+    auto f = makeFuture(true);
+    EXPECT_TRUE(f.getVia(&x));
+    EXPECT_FALSE(x.ran);
+  }
+}
+
+TEST(Via, getTryVia) {
+  {
+    // non-void
+    ManualExecutor x;
+    auto f = via(&x).then([] { return 23; });
+    EXPECT_FALSE(f.isReady());
+    EXPECT_EQ(23, f.getTryVia(&x).value());
+  }
+
+  {
+    // void
+    ManualExecutor x;
+    auto f = via(&x).then();
+    EXPECT_FALSE(f.isReady());
+    auto t = f.getTryVia(&x);
+    EXPECT_TRUE(t.hasValue());
+  }
+
+  {
+    DummyDrivableExecutor x;
+    auto f = makeFuture(23);
+    EXPECT_EQ(23, f.getTryVia(&x).value());
+    EXPECT_FALSE(x.ran);
+  }
+}
+
+TEST(Via, waitVia) {
+  {
+    ManualExecutor x;
+    auto f = via(&x).then();
+    EXPECT_FALSE(f.isReady());
+    f.waitVia(&x);
+    EXPECT_TRUE(f.isReady());
+  }
+
+  {
+    // try rvalue as well
+    ManualExecutor x;
+    auto f = via(&x).then().waitVia(&x);
+    EXPECT_TRUE(f.isReady());
+  }
+
+  {
+    DummyDrivableExecutor x;
+    makeFuture(true).waitVia(&x);
+    EXPECT_FALSE(x.ran);
+  }
+}
+
+TEST(Via, viaRaces) {
+  ManualExecutor x;
+  Promise<Unit> p;
+  auto tid = std::this_thread::get_id();
+  bool done = false;
+
+  std::thread t1([&] {
+    p.getFuture()
+      .via(&x)
+      .then([&](Try<Unit>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
+      .then([&](Try<Unit>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
+      .then([&](Try<Unit>&&) { done = true; });
+  });
+
+  std::thread t2([&] {
+    p.setValue();
+  });
+
+  while (!done) {
+    x.run();
+  }
+  t1.join();
+  t2.join();
+}
+
+TEST(Via, viaDummyExecutorFutureSetValueFirst) {
+  // The callback object will get destroyed when passed to the executor.
+
+  // A promise will be captured by the callback lambda so we can observe that
+  // it will be destroyed.
+  Promise<Unit> captured_promise;
+  auto captured_promise_future = captured_promise.getFuture();
+
+  DummyDrivableExecutor x;
+  auto future = makeFuture().via(&x).then(
+      [c = std::move(captured_promise)] { return 42; });
+
+  EXPECT_THROW(future.get(std::chrono::seconds(5)), BrokenPromise);
+  EXPECT_THROW(
+      captured_promise_future.get(std::chrono::seconds(5)), BrokenPromise);
+}
+
+TEST(Via, viaDummyExecutorFutureSetCallbackFirst) {
+  // The callback object will get destroyed when passed to the executor.
+
+  // A promise will be captured by the callback lambda so we can observe that
+  // it will be destroyed.
+  Promise<Unit> captured_promise;
+  auto captured_promise_future = captured_promise.getFuture();
+
+  DummyDrivableExecutor x;
+  Promise<Unit> trigger;
+  auto future = trigger.getFuture().via(&x).then(
+      [c = std::move(captured_promise)] { return 42; });
+  trigger.setValue();
+
+  EXPECT_THROW(future.get(std::chrono::seconds(5)), BrokenPromise);
+  EXPECT_THROW(
+      captured_promise_future.get(std::chrono::seconds(5)), BrokenPromise);
+}
+
+TEST(Via, viaExecutorDiscardsTaskFutureSetValueFirst) {
+  // The callback object will get destroyed when the ManualExecutor runs out
+  // of scope.
+
+  // A promise will be captured by the callback lambda so we can observe that
+  // it will be destroyed.
+  Promise<Unit> captured_promise;
+  auto captured_promise_future = captured_promise.getFuture();
+
+  Optional<Future<int>> future;
+  {
+    ManualExecutor x;
+    future = makeFuture().via(&x).then(
+        [c = std::move(captured_promise)] { return 42; });
+  }
+
+  EXPECT_THROW(future->get(std::chrono::seconds(5)), BrokenPromise);
+  EXPECT_THROW(
+      captured_promise_future.get(std::chrono::seconds(5)), BrokenPromise);
+}
+
+TEST(Via, viaExecutorDiscardsTaskFutureSetCallbackFirst) {
+  // The callback object will get destroyed when the ManualExecutor runs out
+  // of scope.
+
+  // A promise will be captured by the callback lambda so we can observe that
+  // it will be destroyed.
+  Promise<Unit> captured_promise;
+  auto captured_promise_future = captured_promise.getFuture();
+
+  Optional<Future<int>> future;
+  {
+    ManualExecutor x;
+    Promise<Unit> trigger;
+    future = trigger.getFuture().via(&x).then(
+        [c = std::move(captured_promise)] { return 42; });
+    trigger.setValue();
+  }
+
+  EXPECT_THROW(future->get(std::chrono::seconds(5)), BrokenPromise);
+  EXPECT_THROW(
+      captured_promise_future.get(std::chrono::seconds(5)), BrokenPromise);
+}
+
+TEST(ViaFunc, liftsVoid) {
+  ManualExecutor x;
+  int count = 0;
+  Future<Unit> f = via(&x, [&]{ count++; });
+
+  EXPECT_EQ(0, count);
+  x.run();
+  EXPECT_EQ(1, count);
+}
+
+TEST(ViaFunc, value) {
+  ManualExecutor x;
+  EXPECT_EQ(42, via(&x, []{ return 42; }).getVia(&x));
+}
+
+TEST(ViaFunc, exception) {
+  ManualExecutor x;
+  EXPECT_THROW(
+    via(&x, []() -> int { throw std::runtime_error("expected"); })
+      .getVia(&x),
+    std::runtime_error);
+}
+
+TEST(ViaFunc, future) {
+  ManualExecutor x;
+  EXPECT_EQ(42, via(&x, []{ return makeFuture(42); })
+            .getVia(&x));
+}
+
+TEST(ViaFunc, voidFuture) {
+  ManualExecutor x;
+  int count = 0;
+  via(&x, [&]{ count++; }).getVia(&x);
+  EXPECT_EQ(1, count);
+}
+
+TEST(ViaFunc, isSticky) {
+  ManualExecutor x;
+  int count = 0;
+
+  auto f = via(&x, [&]{ count++; });
+  x.run();
+
+  f.then([&]{ count++; });
+  EXPECT_EQ(1, count);
+  x.run();
+  EXPECT_EQ(2, count);
+}
+
+TEST(ViaFunc, moveOnly) {
+  ManualExecutor x;
+  auto intp = std::make_unique<int>(42);
+
+  EXPECT_EQ(42, via(&x, [intp = std::move(intp)] { return *intp; }).getVia(&x));
+}