From: Hans Fugal Date: Fri, 20 Feb 2015 17:03:14 +0000 (-0800) Subject: (futures) Make executors sticky X-Git-Tag: v0.27.0~26 X-Git-Url: http://plrg.eecs.uci.edu/git/?a=commitdiff_plain;h=fc6f0a5fcaf2f3a08395d73e8cef955b00fe7c5d;p=folly.git (futures) Make executors sticky Summary: Instead of returning a deactivated future, have `via` just set the executor. Propagate the executor from `then`. This fixes the `via().get()` problem, and has semantics similar to before for `via().then().then()`. However, the semantics are now slightly different - each `then` goes back through the executor. This adds some overhead and tweaks the semantics (e.g. if the executor is a threadpool it might execute subsequent `then`s in another thread). However, with `futures::chain` recently introduced, and any other convenience methods that you can dream up and make a case for, we can reasonably get the old once-through-the-executor behavior when performance or other concerns demand it. e.g. `via().then(futures::chain(a, b, c))`. Test Plan: unit tests Reviewed By: hannesr@fb.com Subscribers: zeus-diffs@, mmandal, steveo, rituraj, trunkagent, exa, folly-diffs@, yfeldblum, jsedgwick, davejwatson FB internal diff: D1839691 Tasks: 6048744 Signature: t1:1839691:1424397180:ca0b0ea7b3867769ab8abd254a510059df67011e --- diff --git a/folly/futures/Future-inl.h b/folly/futures/Future-inl.h index a8454155..d3071279 100644 --- a/folly/futures/Future-inl.h +++ b/folly/futures/Future-inl.h @@ -115,6 +115,9 @@ Future::thenImplementation(F func, detail::argResult) { // grab the Future now before we lose our handle on the Promise auto f = p->getFuture(); + if (getExecutor()) { + f.setExecutor(getExecutor()); + } /* This is a bit tricky. @@ -179,6 +182,9 @@ Future::thenImplementation(F func, detail::argResult) { // grab the Future now before we lose our handle on the Promise auto f = p->getFuture(); + if (getExecutor()) { + f.setExecutor(getExecutor()); + } setCallback_( [p, funcm](Try&& t) mutable { @@ -318,8 +324,7 @@ template inline Future Future::via(Executor* executor) && { throwIfInvalid(); - this->deactivate(); - core_->setExecutor(executor); + setExecutor(executor); return std::move(*this); } diff --git a/folly/futures/Future.h b/folly/futures/Future.h index 1ac7e652..3459a7ad 100644 --- a/folly/futures/Future.h +++ b/folly/futures/Future.h @@ -448,7 +448,7 @@ class Future { /// Overload of waitVia() for rvalue Futures Future&& waitVia(DrivableExecutor* e) &&; - private: + protected: typedef detail::Core* corePtr; // shared core state object @@ -462,6 +462,7 @@ class Future { void throwIfInvalid() const; friend class Promise; + template friend class Future; // Variant: returns a value // e.g. f.then([](Try t){ return t.value(); }); @@ -474,6 +475,9 @@ class Future { template typename std::enable_if::type thenImplementation(F func, detail::argResult); + + Executor* getExecutor() { return core_->getExecutor(); } + void setExecutor(Executor* x) { core_->setExecutor(x); } }; /** diff --git a/folly/futures/detail/Core.h b/folly/futures/detail/Core.h index 77205ec0..4891d455 100644 --- a/folly/futures/detail/Core.h +++ b/folly/futures/detail/Core.h @@ -178,7 +178,7 @@ class Core { /// Called by a destructing Future (in the Future thread, by definition) void detachFuture() { - activate(); + activateNoDeprecatedWarning(); detachOne(); } @@ -193,14 +193,13 @@ class Core { } /// May call from any thread - void deactivate() { + void deactivate() DEPRECATED { active_ = false; } /// May call from any thread - void activate() { - active_ = true; - maybeCallback(); + void activate() DEPRECATED { + activateNoDeprecatedWarning(); } /// May call from any thread @@ -211,6 +210,10 @@ class Core { executor_ = x; } + Executor* getExecutor() { + return executor_; + } + /// Call only from Future thread void raise(exception_wrapper e) { std::lock_guard guard(interruptLock_); @@ -234,7 +237,12 @@ class Core { } } - private: + protected: + void activateNoDeprecatedWarning() { + active_ = true; + maybeCallback(); + } + void maybeCallback() { FSM_START(fsm_) case State::Armed: diff --git a/folly/futures/test/FutureTest.cpp b/folly/futures/test/FutureTest.cpp index 532095f2..23c72f8c 100644 --- a/folly/futures/test/FutureTest.cpp +++ b/folly/futures/test/FutureTest.cpp @@ -1135,7 +1135,7 @@ TEST(Future, waitVia) { { // try rvalue as well ManualExecutor x; - auto f = via(&x).activate().then().waitVia(&x); + auto f = via(&x).then().waitVia(&x); EXPECT_TRUE(f.isReady()); } @@ -1146,53 +1146,6 @@ TEST(Future, waitVia) { } } -TEST(Future, callbackAfterActivate) { - Promise p; - auto f = p.getFuture(); - f.deactivate(); - - size_t count = 0; - f.then([&](Try&&) { count++; }); - - p.setValue(); - EXPECT_EQ(0, count); - - f.activate(); - EXPECT_EQ(1, count); -} - -TEST(Future, activateOnDestruct) { - auto f = std::make_shared>(makeFuture()); - f->deactivate(); - - size_t count = 0; - f->then([&](Try&&) { count++; }); - EXPECT_EQ(0, count); - - f.reset(); - EXPECT_EQ(1, count); -} - -TEST(Future, viaActsCold) { - ManualExecutor x; - size_t count = 0; - - auto fv = via(&x); - fv.then([&](Try&&) { count++; }); - - EXPECT_EQ(0, count); - - fv.activate(); - - EXPECT_EQ(1, x.run()); - EXPECT_EQ(1, count); -} - -TEST(Future, viaIsCold) { - ManualExecutor x; - EXPECT_FALSE(via(&x).isActive()); -} - TEST(Future, viaRaces) { ManualExecutor x; Promise p; @@ -1216,35 +1169,6 @@ TEST(Future, viaRaces) { t2.join(); } -// TODO(#4920689) -TEST(Future, viaRaces_2stage) { - ManualExecutor x; - Promise p; - auto tid = std::this_thread::get_id(); - bool done = false; - - std::thread t1([&] { - auto f2 = p.getFuture().via(&x); - f2.then([&](Try&&) { EXPECT_EQ(tid, std::this_thread::get_id()); }) - .then([&](Try&&) { EXPECT_EQ(tid, std::this_thread::get_id()); }) - .then([&](Try&&) { done = true; }); - - // the bug was in the promise being fulfilled before f2 is reactivated. we - // could sleep, but yielding should cause this to fail with reasonable - // probability - std::this_thread::yield(); - f2.activate(); - }); - - std::thread t2([&] { - p.setValue(); - }); - - while (!done) x.run(); - t1.join(); - t2.join(); -} - TEST(Future, getFuture_after_setValue) { Promise p; p.setValue(42); diff --git a/folly/futures/test/ViaTest.cpp b/folly/futures/test/ViaTest.cpp index d203c18c..5580d8bd 100644 --- a/folly/futures/test/ViaTest.cpp +++ b/folly/futures/test/ViaTest.cpp @@ -47,9 +47,9 @@ struct ViaFixture : public testing::Test { done(false) { t = std::thread([=] { - ManualWaiter eastWaiter(eastExecutor); - while (!done) - eastWaiter.drive(); + ManualWaiter eastWaiter(eastExecutor); + while (!done) + eastWaiter.drive(); }); } @@ -92,8 +92,7 @@ TEST(Via, then_future) { auto future = makeFuture(1) .then([](Try&& t) { return makeFuture(t.value() == 1); - }) - ; + }); EXPECT_TRUE(future.value()); } @@ -120,27 +119,6 @@ TEST(Via, then_function) { EXPECT_EQ(f.value(), "start;static;class-static;class"); } -TEST_F(ViaFixture, deactivateChain) { - bool flag = false; - auto f = makeFuture().deactivate(); - EXPECT_FALSE(f.isActive()); - auto f2 = f.then([&](Try){ flag = true; }); - EXPECT_FALSE(flag); -} - -TEST_F(ViaFixture, deactivateActivateChain) { - bool flag = false; - // you can do this all day long with temporaries. - auto f1 = makeFuture().deactivate().activate().deactivate(); - // Chaining on activate/deactivate requires an rvalue, so you have to move - // one of these two ways (if you're not using a temporary). - auto f2 = std::move(f1).activate(); - f2.deactivate(); - auto f3 = std::move(f2.activate()); - f3.then([&](Try){ flag = true; }); - EXPECT_TRUE(flag); -} - TEST_F(ViaFixture, thread_hops) { auto westThreadId = std::this_thread::get_id(); auto f = via(eastExecutor.get()).then([=](Try&& t) { @@ -156,22 +134,28 @@ TEST_F(ViaFixture, thread_hops) { TEST_F(ViaFixture, chain_vias) { auto westThreadId = std::this_thread::get_id(); - auto f = via(eastExecutor.get()).then([=](Try&& t) { + auto f = via(eastExecutor.get()).then([=]() { EXPECT_NE(std::this_thread::get_id(), westThreadId); - return makeFuture(1); - }).then([=](Try&& t) { - int val = t.value(); - return makeFuture(std::move(val)).via(westExecutor.get()) - .then([=](Try&& t) mutable { + return 1; + }).then([=](int val) { + return makeFuture(val).via(westExecutor.get()) + .then([=](int val) mutable { EXPECT_EQ(std::this_thread::get_id(), westThreadId); - return t.value(); + return val + 1; }); - }).then([=](Try&& t) { + }).then([=](int val) { + // even though ultimately the future that triggers this one executed in + // the west thread, this then() inherited the executor from its + // predecessor, ie the eastExecutor. + EXPECT_NE(std::this_thread::get_id(), westThreadId); + return val + 1; + }).via(westExecutor.get()).then([=](int val) { + // go back to west, so we can wait on it EXPECT_EQ(std::this_thread::get_id(), westThreadId); - return t.value(); + return val + 1; }); - EXPECT_EQ(f.getVia(waiter.get()), 1); + EXPECT_EQ(f.getVia(waiter.get()), 4); } TEST_F(ViaFixture, bareViaAssignment) {