From 73e3273b9f2f2f5686e2b6ff4a87d572f2391585 Mon Sep 17 00:00:00 2001 From: Hans Fugal Date: Mon, 30 Jun 2014 17:29:05 -0700 Subject: [PATCH] (wangle) cold via Summary: Instead of returning a Later, `via` returns a cold future. This works without keeping a backreference like Later does, because an inactive Future will always activate on destruction. Alternatively we could have an extra Promise, a la Later, and pass that along like Later does, and require launch() at the end (though, implicit launching on destruction would be an option there too). If you think this approach is viable I'll clean it up on Wednesday: make sure all the calling sites work, etc. Test Plan: new unit test This may fail in contbuild, I haven't done the codemod for calling sites, if there are any. Reviewed By: hannesr@fb.com Subscribers: jsedgwick, net-systems@, fugalh, exa FB internal diff: D1412499 Tasks: 4480567 --- folly/wangle/Future-inl.h | 12 +++++- folly/wangle/Future.h | 42 ++++++++++++++----- folly/wangle/Later.h | 69 ++++++++++++++++++-------------- folly/wangle/README.md | 29 +++++++++----- folly/wangle/ThreadGate.h | 24 ++++++----- folly/wangle/detail/State.h | 18 ++++++++- folly/wangle/test/FutureTest.cpp | 46 +++++++++++++++++++++ folly/wangle/test/LaterTest.cpp | 20 --------- 8 files changed, 179 insertions(+), 81 deletions(-) diff --git a/folly/wangle/Future-inl.h b/folly/wangle/Future-inl.h index 1e17e7cb..a8f5b94b 100644 --- a/folly/wangle/Future-inl.h +++ b/folly/wangle/Future-inl.h @@ -188,9 +188,17 @@ Try& Future::getTry() { template template -inline Later Future::via(Executor* executor) { +inline Future Future::via(Executor* executor) { throwIfInvalid(); - return Later(std::move(*this)).via(executor); + auto f = then([=](Try&& t) { + MoveWrapper> promise; + MoveWrapper> tw(std::move(t)); + auto f = promise->getFuture(); + executor->add([=]() mutable { promise->fulfilTry(std::move(*tw)); }); + return f; + }); + f.deactivate(); + return f; } template diff --git a/folly/wangle/Future.h b/folly/wangle/Future.h index e468eebe..0e99bb6a 100644 --- a/folly/wangle/Future.h +++ b/folly/wangle/Future.h @@ -30,7 +30,6 @@ namespace folly { namespace wangle { template struct isFuture; -template class Later; template class Future { @@ -60,13 +59,27 @@ class Future { typename std::add_lvalue_reference::type value() const; - /// Returns a Later which will call back on the other side of executor. + /// Returns an inactive Future which will call back on the other side of + /// executor (when it is activated). /// - /// f.via(e).then(a).then(b).launch(); + /// NB remember that Futures activate when they destruct. This is good, + /// it means that this will work: /// - /// a and b will execute in the same context (the far side of e) + /// f.via(e).then(a).then(b); + /// + /// a and b will execute in the same context (the far side of e), because + /// the Future (temporary variable) created by via(e) does not call back + /// until it destructs, which is after then(a) and then(b) have been wired + /// up. + /// + /// But this is still racy: + /// + /// f = f.via(e).then(a); + /// f.then(b); + /// + /// If you need something like that, use a Later. template - Later via(Executor* executor); + Future via(Executor* executor); /** True when the result (or exception) is ready. */ bool isReady() const; @@ -85,9 +98,9 @@ class Future { throws, the exception will be captured in the Future that is returned. */ /* TODO n3428 and other async frameworks have something like then(scheduler, - Future), we probably want to support a similar API (instead of - via. or rather, via should return a cold future (Later) and we provide - then(scheduler, Future) ). */ + Future), we might want to support a similar API which could be + implemented a little more efficiently than + f.via(executor).then(callback) */ template typename std::enable_if< !isFuture&&)>::type>::value, @@ -180,6 +193,18 @@ class Future { template void setCallback_(F&& func); + /// A Future's callback is executed when all three of these conditions have + /// become true: it has a value (set by the Promise), it has a callback (set + /// by then), and it is active (active by default). + /// + /// Inactive Futures will activate upon destruction. + void activate() { + state_->activate(); + } + void deactivate() { + state_->deactivate(); + } + private: typedef detail::State* statePtr; @@ -315,4 +340,3 @@ Future waitWithSemaphore(Future&& f, Duration timeout); }} // folly::wangle #include "Future-inl.h" -#include "Later.h" diff --git a/folly/wangle/Later.h b/folly/wangle/Later.h index 84430135..d1693175 100644 --- a/folly/wangle/Later.h +++ b/folly/wangle/Later.h @@ -26,16 +26,42 @@ template struct isLaterOrFuture; template struct isLater; /* - * Since wangle primitives (promise/future) are not thread safe, it is difficult - * to build complex asynchronous workflows. A Later allows you to build such a - * workflow before actually launching it so that callbacks can be set in a - * threadsafe manner. + * Later is like a cold Future, but makes it easier to avoid triggering until + * later, because it must be triggered explicitly. An equivalence example will + * help differentiate: * - * The interface to add additional work is the same as future: a then() method - * that takes a function that can return either a type T, a Future, or a - * Later + * Later later = + * Later(std::move(foo)) + * .then(cb1) + * .via(ex1) + * .then(cb2) + * .then(cb3) + * .via(ex2) + * .then(cb4) + * .then(cb5); + * ... + * later.launch(); * - * Thread transitions are done by using executors and calling the via() method. + * Future coldFuture = makeFuture(std::move(foo)); + * coldFuture.deactivate(); + * coldFuture + * .then(cb1) + * .via(ex1) + * .then(cb2) + * .then(cb3) + * .via(ex2) + * .then(cb4) + * .then(cb5); + * ... + * coldFuture.activate(); + * + * Using a Later means you don't have to grab a handle to the first Future and + * deactivate it. + * + * Later used to be a workaround to the thread-unsafe nature of Future + * chaining, but that has changed and there is no need to use Later if your + * only goal is to traverse thread boundaries with executors. In that case, + * just use Future::via(). * * Here is an example of a workflow: * @@ -49,14 +75,6 @@ template struct isLater; * .via(serverExecutor) * .then([=]Try&& t) { return sendClientResponse(t.value()); }) * .launch(); - * - * Although this workflow traverses many threads, we are able to string - * continuations together in a threadsafe manner. - * - * Laters can also be used to wrap preexisting asynchronous modules that were - * not built with wangle in mind. You can create a Later with a function that - * takes a callback as input. The function will not actually be called until - * launch(), allowing you to string then() statements on top of the callback. */ template class Later { @@ -88,9 +106,9 @@ class Later { /* * This constructor is used to wrap a pre-existing cob-style asynchronous api - * so that it can be used in wangle in a threadsafe manner. wangle provides - * the callback to this pre-existing api, and this callback will fulfill a - * promise so as to incorporate this api into the workflow. + * so that it can be used in wangle. wangle provides the callback to this + * pre-existing api, and this callback will fulfill a promise so as to + * incorporate this api into the workflow. * * Example usage: * @@ -101,6 +119,7 @@ class Later { * addAsync(1, 2, std::move(fn)); * }); */ + // TODO we should implement a makeFuture-ish with this pattern too, now. template ::value>::type, class = typename std::enable_if::value>::type> @@ -129,8 +148,7 @@ class Later { * be chained to the new Later before launching the new Later. * * This can be used to build asynchronous modules that can be called from a - * user thread and completed in a callback thread. Callbacks can be set up - * ahead of time without thread safety issues. + * user thread and completed in a callback thread. * * Using the Later(std::function)>&& fn) * constructor, you can wrap existing asynchronous modules with a Later and @@ -153,20 +171,13 @@ class Later { * called in the executor provided in the constructor. Subsequent then() * calls will be made, potentially changing threads if a via() call is made. * The future returned will be fulfilled in the last executor. - * - * Thread safety issues of Futures still apply. If you want to wait on the - * Future, it must be done in the thread that will fulfil it. */ Future launch(); /* - * Same as launch, only no Future is returned. This guarantees thread safe - * cleanup of the internal Futures, even if the Later completes in a different - * thread than the thread that calls fireAndForget(). - * * Deprecated. Use launch() */ - void fireAndForget() { launch(); } + void fireAndForget() __attribute__ ((deprecated)) { launch(); } private: Promise starter_; diff --git a/folly/wangle/README.md b/folly/wangle/README.md index 662a1ea0..9580db00 100644 --- a/folly/wangle/README.md +++ b/folly/wangle/README.md @@ -191,28 +191,39 @@ This is legal and technically threadsafe. However, it is important to realize th Naturally, you will want some control over which thread executes callbacks. We have a few mechanisms to help. -The first and most useful is `Later`, which behaves like a Future but nothing starts executing until `launch` is called. Thus you avoid the race condition when setting up the multithreaded workflow. +The first and most useful is `via`, which passes execution through an `Executor`, which usually has the effect of running the callback in a new thread. ```C++ -Later() +aFuture .then(x) .via(e1).then(y1).then(y2) - .via(e2).then(z) - .launch(); + .via(e2).then(z); ``` -`x` will execute in the current thread (the one calling `launch`). `y1` and `y2` will execute in the thread on the other side of `e1`, and `z` will execute in the thread on the other side of `e2`. `y1` and `y2` will execute on the same thread, whichever thread that is. If `e1` and `e2` execute in different threads than the current thread, then the final callback does not happen in the current thread. If you want to get back to the current thread, you need to get there via an executor. +`x` will execute in the current thread. `y1` and `y2` will execute in the thread on the other side of `e1`, and `z` will execute in the thread on the other side of `e2`. `y1` and `y2` will execute on the same thread, whichever thread that is. If `e1` and `e2` execute in different threads than the current thread, then the final callback does not happen in the current thread. If you want to get back to the current thread, you need to get there via an executor. -`Future::via(Executor*)` will return a Later, too. +This works because `via` returns a deactivated ("cold") Future, which blocks the propagation of callbacks until it is activated. Activation happens either explicitly (`activate`) or implicitly when the Future returned by `via` is destructed. In this example, there is no ambiguity about in which context any of the callbacks happen (including `y2`), because propagation is blocked at the `via` callsites until after everything is wired up (temporaries are destructed after the calls to `then` have completed). + +You can still have a race after `via` if you break it into multiple statements, e.g. in this counterexample: +```C++ +f = f.via(e1).then(y1).then(y2); // nothing racy here +f2.then(y3); // racy +``` +If you want more control over the delayed execution, check out `Later`. +```C++ +Later later; +later = later.via(e1).then(y1).then(y2); // nothing racy here +later = later.then(y3); // nor here +later.launch(); // explicit launch +``` The third and least flexible (but sometimes very useful) method assumes only two threads and that you want to do something in the far thread, then come back to the current thread. `ThreadGate` is an interface for a bidirectional gateway between two threads. It's usually easier to use a Later, but ThreadGate can be more efficient, and if the pattern is used often in your code it can be more convenient. ```C++ // Using a ThreadGate (which has two executors xe and xw) tg.gate(a).then(b); -// Using Later +// Using via makeFuture() .via(xe).then(a) - .via(xw).then(b) - .launch(); + .via(xw).then(b); ``` ## You make me Promises, Promises diff --git a/folly/wangle/ThreadGate.h b/folly/wangle/ThreadGate.h index ef8c06af..d7b5124d 100644 --- a/folly/wangle/ThreadGate.h +++ b/folly/wangle/ThreadGate.h @@ -21,16 +21,21 @@ namespace folly { namespace wangle { /** - Yo dawg, I heard you like asynchrony so I put asynchrony in your asynchronous - framework. + The ThreadGate strategy encapsulates a bidirectional gate via two Executors, + kind of like a stargate for wangle Future chains. Its implementation is + slightly more efficient then using Future::via in both directions, and if + the pattern is common it can be more convenient (although the overhead of + setting up a ThreadGate is less convenient in most situations). - Wangle's futures and promises are not thread safe. Counterintuitive as this - may seem at first, this is very intentional. Making futures and promises - threadsafe drastically reduces their performance. + // Using a ThreadGate (which has two executors xe and xw) + tg.gate(a).then(b); - On the other hand, an asynchronous framework isn't much use if you can't do - asynchronous things in other threads. So we use the ThreadGate strategy to - decouple the threads and their futures with a form of message passing. + // Using via + makeFuture() + .via(xe).then(a) + .via(xw).then(b); + + If you're not sure whether you want a ThreadGate, you don't. Use via. There are two actors, the east thread which does the asynchronous operation (the server) and the west thread that wants the asynchronous operation done @@ -67,9 +72,6 @@ namespace folly { namespace wangle { Future change toward a multithreaded architecture easier, as you need only change the components of the ThreadGate which your client code is already using. - - Later (in Later.h) is an alternative mechanism for thread-traversing - asynchronous workflows. */ class ThreadGate { public: diff --git a/folly/wangle/detail/State.h b/folly/wangle/detail/State.h index 39b6a7f2..4c78d956 100644 --- a/folly/wangle/detail/State.h +++ b/folly/wangle/detail/State.h @@ -109,6 +109,7 @@ class State { if (!callback_) { setCallback([](Try&&) {}); } + activate(); detachOne(); } @@ -120,10 +121,24 @@ class State { detachOne(); } + void deactivate() { + std::lock_guard lock(mutex_); + active_ = false; + } + + void activate() { + { + std::lock_guard lock(mutex_); + active_ = true; + } + maybeCallback(); + } + private: void maybeCallback() { std::lock_guard lock(mutex_); - if (value_ && callback_) { + if (!calledBack_ && + value_ && callback_ && active_) { // TODO we should probably try/catch here callback_(std::move(*value_)); calledBack_ = true; @@ -150,6 +165,7 @@ class State { std::function&&)> callback_; bool calledBack_ = false; unsigned char detached_ = 0; + bool active_ = true; // this lock isn't meant to protect all accesses to members, only the ones // that need to be threadsafe: the act of setting value_ and callback_, and diff --git a/folly/wangle/test/FutureTest.cpp b/folly/wangle/test/FutureTest.cpp index cbf76a15..df5341de 100644 --- a/folly/wangle/test/FutureTest.cpp +++ b/folly/wangle/test/FutureTest.cpp @@ -25,6 +25,7 @@ #include #include #include +#include using namespace folly::wangle; using std::pair; @@ -742,3 +743,48 @@ TEST(Future, waitWithSemaphoreForTime) { EXPECT_TRUE(t.isReady()); } } + +TEST(Future, callbackAfterActivate) { + Promise p; + auto f = p.getFuture(); + f.deactivate(); + + size_t count = 0; + f.then([&](Try&&) { count++; }); + + p.setValue(); + EXPECT_EQ(0, count); + + f.activate(); + EXPECT_EQ(1, count); +} + +TEST(Future, activateOnDestruct) { + Promise p; + auto f = p.getFuture(); + f.deactivate(); + + size_t count = 0; + f.then([&](Try&&) { count++; }); + + p.setValue(); + EXPECT_EQ(0, count); + + f = makeFuture(); // force destruction of old f + EXPECT_EQ(1, count); +} + +TEST(Future, viaIsCold) { + ManualExecutor x; + size_t count = 0; + + auto fv = makeFuture().via(&x); + fv.then([&](Try&&) { count++; }); + + EXPECT_EQ(0, count); + + fv.activate(); + + EXPECT_EQ(1, x.run()); + EXPECT_EQ(1, count); +} diff --git a/folly/wangle/test/LaterTest.cpp b/folly/wangle/test/LaterTest.cpp index 5a00cb5e..bdc03832 100644 --- a/folly/wangle/test/LaterTest.cpp +++ b/folly/wangle/test/LaterTest.cpp @@ -159,23 +159,3 @@ TEST_F(LaterFixture, chain_laters) { } EXPECT_EQ(future.value(), 1); } - -TEST_F(LaterFixture, fire_and_forget) { - auto west = westExecutor.get(); - later.via(eastExecutor.get()).then([=](Try&& t) { - west->add([]() {}); - }).fireAndForget(); - waiter->makeProgress(); -} - -TEST(Later, FutureViaReturnsLater) { - ManualExecutor x; - { - Future f = makeFuture(); - Later l = f.via(&x); - } - { - Future f = makeFuture(42); - Later l = f.via(&x); - } -} -- 2.34.1