(futures) Make executors sticky
authorHans Fugal <fugalh@fb.com>
Fri, 20 Feb 2015 17:03:14 +0000 (09:03 -0800)
committerAlecs King <int@fb.com>
Tue, 3 Mar 2015 03:26:10 +0000 (19:26 -0800)
Summary:
Instead of returning a deactivated future, have `via` just set the executor. Propagate the executor from `then`. This fixes the `via().get()` problem, and has semantics similar to before for `via().then().then()`.

However, the semantics are now slightly different - each `then` goes back through the executor. This adds some overhead and tweaks the semantics (e.g. if the executor is a threadpool it might execute subsequent `then`s in another thread). However, with `futures::chain` recently introduced, and any other convenience methods that you can dream up and make a case for, we can reasonably get the old once-through-the-executor behavior when performance or other concerns demand it. e.g. `via().then(futures::chain(a, b, c))`.

Test Plan: unit tests

Reviewed By: hannesr@fb.com

Subscribers: zeus-diffs@, mmandal, steveo, rituraj, trunkagent, exa, folly-diffs@, yfeldblum, jsedgwick, davejwatson

FB internal diff: D1839691

Tasks: 6048744

Signature: t1:1839691:1424397180:ca0b0ea7b3867769ab8abd254a510059df67011e

folly/futures/Future-inl.h
folly/futures/Future.h
folly/futures/detail/Core.h
folly/futures/test/FutureTest.cpp
folly/futures/test/ViaTest.cpp

index a8454155f518c25a82173ed9a49bf595ef9b452e..d307127976b990e658cbb98176f54788416d9bbd 100644 (file)
@@ -115,6 +115,9 @@ Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
 
   // grab the Future now before we lose our handle on the Promise
   auto f = p->getFuture();
+  if (getExecutor()) {
+    f.setExecutor(getExecutor());
+  }
 
   /* This is a bit tricky.
 
@@ -179,6 +182,9 @@ Future<T>::thenImplementation(F func, detail::argResult<isTry, F, Args...>) {
 
   // grab the Future now before we lose our handle on the Promise
   auto f = p->getFuture();
+  if (getExecutor()) {
+    f.setExecutor(getExecutor());
+  }
 
   setCallback_(
     [p, funcm](Try<T>&& t) mutable {
@@ -318,8 +324,7 @@ template <typename Executor>
 inline Future<T> Future<T>::via(Executor* executor) && {
   throwIfInvalid();
 
-  this->deactivate();
-  core_->setExecutor(executor);
+  setExecutor(executor);
 
   return std::move(*this);
 }
index 1ac7e652d19eb426fb0749cf62bb5b7db5e4c0a5..3459a7ad4d988c1eeeb6f390c2034de856e4868e 100644 (file)
@@ -448,7 +448,7 @@ class Future {
   /// Overload of waitVia() for rvalue Futures
   Future<T>&& waitVia(DrivableExecutor* e) &&;
 
- private:
+ protected:
   typedef detail::Core<T>* corePtr;
 
   // shared core state object
@@ -462,6 +462,7 @@ class Future {
   void throwIfInvalid() const;
 
   friend class Promise<T>;
+  template <class> friend class Future;
 
   // Variant: returns a value
   // e.g. f.then([](Try<T> t){ return t.value(); });
@@ -474,6 +475,9 @@ class Future {
   template <typename F, typename R, bool isTry, typename... Args>
   typename std::enable_if<R::ReturnsFuture::value, typename R::Return>::type
   thenImplementation(F func, detail::argResult<isTry, F, Args...>);
+
+  Executor* getExecutor() { return core_->getExecutor(); }
+  void setExecutor(Executor* x) { core_->setExecutor(x); }
 };
 
 /**
index 77205ec088db40a2d46e4fae5389c6133fcec470..4891d4551655ea0d9de49fad0039fc9fe4c1fd25 100644 (file)
@@ -178,7 +178,7 @@ class Core {
 
   /// Called by a destructing Future (in the Future thread, by definition)
   void detachFuture() {
-    activate();
+    activateNoDeprecatedWarning();
     detachOne();
   }
 
@@ -193,14 +193,13 @@ class Core {
   }
 
   /// May call from any thread
-  void deactivate() {
+  void deactivate() DEPRECATED {
     active_ = false;
   }
 
   /// May call from any thread
-  void activate() {
-    active_ = true;
-    maybeCallback();
+  void activate() DEPRECATED {
+    activateNoDeprecatedWarning();
   }
 
   /// May call from any thread
@@ -211,6 +210,10 @@ class Core {
     executor_ = x;
   }
 
+  Executor* getExecutor() {
+    return executor_;
+  }
+
   /// Call only from Future thread
   void raise(exception_wrapper e) {
     std::lock_guard<decltype(interruptLock_)> guard(interruptLock_);
@@ -234,7 +237,12 @@ class Core {
     }
   }
 
- private:
+ protected:
+  void activateNoDeprecatedWarning() {
+    active_ = true;
+    maybeCallback();
+  }
+
   void maybeCallback() {
     FSM_START(fsm_)
       case State::Armed:
index 532095f2f048afde4e1a6bf5efcf42b6a549fbc9..23c72f8ccd19aac33a864a82ddec54a6103b0534 100644 (file)
@@ -1135,7 +1135,7 @@ TEST(Future, waitVia) {
   {
     // try rvalue as well
     ManualExecutor x;
-    auto f = via(&x).activate().then().waitVia(&x);
+    auto f = via(&x).then().waitVia(&x);
     EXPECT_TRUE(f.isReady());
   }
 
@@ -1146,53 +1146,6 @@ TEST(Future, waitVia) {
   }
 }
 
-TEST(Future, callbackAfterActivate) {
-  Promise<void> p;
-  auto f = p.getFuture();
-  f.deactivate();
-
-  size_t count = 0;
-  f.then([&](Try<void>&&) { count++; });
-
-  p.setValue();
-  EXPECT_EQ(0, count);
-
-  f.activate();
-  EXPECT_EQ(1, count);
-}
-
-TEST(Future, activateOnDestruct) {
-  auto f = std::make_shared<Future<void>>(makeFuture());
-  f->deactivate();
-
-  size_t count = 0;
-  f->then([&](Try<void>&&) { count++; });
-  EXPECT_EQ(0, count);
-
-  f.reset();
-  EXPECT_EQ(1, count);
-}
-
-TEST(Future, viaActsCold) {
-  ManualExecutor x;
-  size_t count = 0;
-
-  auto fv = via(&x);
-  fv.then([&](Try<void>&&) { count++; });
-
-  EXPECT_EQ(0, count);
-
-  fv.activate();
-
-  EXPECT_EQ(1, x.run());
-  EXPECT_EQ(1, count);
-}
-
-TEST(Future, viaIsCold) {
-  ManualExecutor x;
-  EXPECT_FALSE(via(&x).isActive());
-}
-
 TEST(Future, viaRaces) {
   ManualExecutor x;
   Promise<void> p;
@@ -1216,35 +1169,6 @@ TEST(Future, viaRaces) {
   t2.join();
 }
 
-// TODO(#4920689)
-TEST(Future, viaRaces_2stage) {
-  ManualExecutor x;
-  Promise<void> p;
-  auto tid = std::this_thread::get_id();
-  bool done = false;
-
-  std::thread t1([&] {
-    auto f2 = p.getFuture().via(&x);
-    f2.then([&](Try<void>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
-      .then([&](Try<void>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
-      .then([&](Try<void>&&) { done = true; });
-
-    // the bug was in the promise being fulfilled before f2 is reactivated. we
-    // could sleep, but yielding should cause this to fail with reasonable
-    // probability
-    std::this_thread::yield();
-    f2.activate();
-  });
-
-  std::thread t2([&] {
-    p.setValue();
-  });
-
-  while (!done) x.run();
-  t1.join();
-  t2.join();
-}
-
 TEST(Future, getFuture_after_setValue) {
   Promise<int> p;
   p.setValue(42);
index d203c18c211eb630f97d359877a6ebb24776b54f..5580d8bd88af1d04f020372cda821d814e88d8b6 100644 (file)
@@ -47,9 +47,9 @@ struct ViaFixture : public testing::Test {
     done(false)
   {
     t = std::thread([=] {
-      ManualWaiter eastWaiter(eastExecutor);
-      while (!done)
-        eastWaiter.drive();
+        ManualWaiter eastWaiter(eastExecutor);
+        while (!done)
+          eastWaiter.drive();
       });
   }
 
@@ -92,8 +92,7 @@ TEST(Via, then_future) {
   auto future = makeFuture(1)
     .then([](Try<int>&& t) {
       return makeFuture(t.value() == 1);
-    })
-    ;
+    });
   EXPECT_TRUE(future.value());
 }
 
@@ -120,27 +119,6 @@ TEST(Via, then_function) {
   EXPECT_EQ(f.value(), "start;static;class-static;class");
 }
 
-TEST_F(ViaFixture, deactivateChain) {
-  bool flag = false;
-  auto f = makeFuture().deactivate();
-  EXPECT_FALSE(f.isActive());
-  auto f2 = f.then([&](Try<void>){ flag = true; });
-  EXPECT_FALSE(flag);
-}
-
-TEST_F(ViaFixture, deactivateActivateChain) {
-  bool flag = false;
-  // you can do this all day long with temporaries.
-  auto f1 = makeFuture().deactivate().activate().deactivate();
-  // Chaining on activate/deactivate requires an rvalue, so you have to move
-  // one of these two ways (if you're not using a temporary).
-  auto f2 = std::move(f1).activate();
-  f2.deactivate();
-  auto f3 = std::move(f2.activate());
-  f3.then([&](Try<void>){ flag = true; });
-  EXPECT_TRUE(flag);
-}
-
 TEST_F(ViaFixture, thread_hops) {
   auto westThreadId = std::this_thread::get_id();
   auto f = via(eastExecutor.get()).then([=](Try<void>&& t) {
@@ -156,22 +134,28 @@ TEST_F(ViaFixture, thread_hops) {
 
 TEST_F(ViaFixture, chain_vias) {
   auto westThreadId = std::this_thread::get_id();
-  auto f = via(eastExecutor.get()).then([=](Try<void>&& t) {
+  auto f = via(eastExecutor.get()).then([=]() {
     EXPECT_NE(std::this_thread::get_id(), westThreadId);
-    return makeFuture<int>(1);
-  }).then([=](Try<int>&& t) {
-    int val = t.value();
-    return makeFuture(std::move(val)).via(westExecutor.get())
-      .then([=](Try<int>&& t) mutable {
+    return 1;
+  }).then([=](int val) {
+    return makeFuture(val).via(westExecutor.get())
+      .then([=](int val) mutable {
         EXPECT_EQ(std::this_thread::get_id(), westThreadId);
-        return t.value();
+        return val + 1;
       });
-  }).then([=](Try<int>&& t) {
+  }).then([=](int val) {
+    // even though ultimately the future that triggers this one executed in
+    // the west thread, this then() inherited the executor from its
+    // predecessor, ie the eastExecutor.
+    EXPECT_NE(std::this_thread::get_id(), westThreadId);
+    return val + 1;
+  }).via(westExecutor.get()).then([=](int val) {
+    // go back to west, so we can wait on it
     EXPECT_EQ(std::this_thread::get_id(), westThreadId);
-    return t.value();
+    return val + 1;
   });
 
-  EXPECT_EQ(f.getVia(waiter.get()), 1);
+  EXPECT_EQ(f.getVia(waiter.get()), 4);
 }
 
 TEST_F(ViaFixture, bareViaAssignment) {