(wangle) cold via
authorHans Fugal <fugalh@fb.com>
Tue, 1 Jul 2014 00:29:05 +0000 (17:29 -0700)
committerTudor Bosman <tudorb@fb.com>
Mon, 7 Jul 2014 15:42:22 +0000 (08:42 -0700)
Summary:
Instead of returning a Later, `via` returns a cold future.
This works without keeping a backreference like Later does, because an inactive Future will always activate on destruction. Alternatively we could have an extra Promise, a la Later, and pass that along like Later does, and require launch() at the end (though, implicit launching on destruction would be an option there too).

If you think this approach is viable I'll clean it up on Wednesday: make sure all the calling sites work, etc.

Test Plan:
new unit test
This may fail in contbuild, I haven't done the codemod for calling sites, if there are any.

Reviewed By: hannesr@fb.com

Subscribers: jsedgwick, net-systems@, fugalh, exa

FB internal diff: D1412499

Tasks: 4480567

folly/wangle/Future-inl.h
folly/wangle/Future.h
folly/wangle/Later.h
folly/wangle/README.md
folly/wangle/ThreadGate.h
folly/wangle/detail/State.h
folly/wangle/test/FutureTest.cpp
folly/wangle/test/LaterTest.cpp

index 1e17e7cb40d4c3c8769a7caa35fe0200df40b6c2..a8f5b94bb1da2d162981b43d9fbacd521f00d70f 100644 (file)
@@ -188,9 +188,17 @@ Try<T>& Future<T>::getTry() {
 
 template <class T>
 template <typename Executor>
-inline Later<T> Future<T>::via(Executor* executor) {
+inline Future<T> Future<T>::via(Executor* executor) {
   throwIfInvalid();
-  return Later<T>(std::move(*this)).via(executor);
+  auto f = then([=](Try<T>&& t) {
+    MoveWrapper<Promise<T>> promise;
+    MoveWrapper<Try<T>> tw(std::move(t));
+    auto f = promise->getFuture();
+    executor->add([=]() mutable { promise->fulfilTry(std::move(*tw)); });
+    return f;
+  });
+  f.deactivate();
+  return f;
 }
 
 template <class T>
index e468eebe6cb5c5edea3df2a619199e33b137184e..0e99bb6ac9c4506d6be31828f0d3564057ea1370 100644 (file)
@@ -30,7 +30,6 @@
 namespace folly { namespace wangle {
 
 template <typename T> struct isFuture;
-template <class> class Later;
 
 template <class T>
 class Future {
@@ -60,13 +59,27 @@ class Future {
   typename std::add_lvalue_reference<const T>::type
   value() const;
 
-  /// Returns a Later which will call back on the other side of executor.
+  /// Returns an inactive Future which will call back on the other side of
+  /// executor (when it is activated).
   ///
-  ///   f.via(e).then(a).then(b).launch();
+  /// NB remember that Futures activate when they destruct. This is good,
+  /// it means that this will work:
   ///
-  /// a and b will execute in the same context (the far side of e)
+  ///   f.via(e).then(a).then(b);
+  ///
+  /// a and b will execute in the same context (the far side of e), because
+  /// the Future (temporary variable) created by via(e) does not call back
+  /// until it destructs, which is after then(a) and then(b) have been wired
+  /// up.
+  ///
+  /// But this is still racy:
+  ///
+  ///   f = f.via(e).then(a);
+  ///   f.then(b);
+  ///
+  /// If you need something like that, use a Later.
   template <typename Executor>
-  Later<T> via(Executor* executor);
+  Future<T> via(Executor* executor);
 
   /** True when the result (or exception) is ready. */
   bool isReady() const;
@@ -85,9 +98,9 @@ class Future {
     throws, the exception will be captured in the Future that is returned.
     */
   /* TODO n3428 and other async frameworks have something like then(scheduler,
-     Future), we probably want to support a similar API (instead of
-     via. or rather, via should return a cold future (Later) and we provide
-     then(scheduler, Future) ). */
+     Future), we might want to support a similar API which could be
+     implemented a little more efficiently than
+     f.via(executor).then(callback) */
   template <class F>
   typename std::enable_if<
     !isFuture<typename std::result_of<F(Try<T>&&)>::type>::value,
@@ -180,6 +193,18 @@ class Future {
   template <class F>
   void setCallback_(F&& func);
 
+  /// A Future's callback is executed when all three of these conditions have
+  /// become true: it has a value (set by the Promise), it has a callback (set
+  /// by then), and it is active (active by default).
+  ///
+  /// Inactive Futures will activate upon destruction.
+  void activate() {
+    state_->activate();
+  }
+  void deactivate() {
+    state_->deactivate();
+  }
+
  private:
   typedef detail::State<T>* statePtr;
 
@@ -315,4 +340,3 @@ Future<T> waitWithSemaphore(Future<T>&& f, Duration timeout);
 }} // folly::wangle
 
 #include "Future-inl.h"
-#include "Later.h"
index 844301354c42f773b8e4f114b005ad361744fafc..d169317544c0a395e543323c566e1b7c480c0be2 100644 (file)
@@ -26,16 +26,42 @@ template <typename T> struct isLaterOrFuture;
 template <typename T> struct isLater;
 
 /*
- * Since wangle primitives (promise/future) are not thread safe, it is difficult
- * to build complex asynchronous workflows. A Later allows you to build such a
- * workflow before actually launching it so that callbacks can be set in a
- * threadsafe manner.
+ * Later is like a cold Future, but makes it easier to avoid triggering until
+ * later, because it must be triggered explicitly. An equivalence example will
+ * help differentiate:
  *
- * The interface to add additional work is the same as future: a then() method
- * that takes a function that can return either a type T, a Future<T>, or a
- * Later<T>
+ *   Later<Foo> later =
+ *     Later<Foo>(std::move(foo))
+ *     .then(cb1)
+ *     .via(ex1)
+ *     .then(cb2)
+ *     .then(cb3)
+ *     .via(ex2)
+ *     .then(cb4)
+ *     .then(cb5);
+ *   ...
+ *   later.launch();
  *
- * Thread transitions are done by using executors and calling the via() method.
+ *   Future<Foo> coldFuture = makeFuture(std::move(foo));
+ *   coldFuture.deactivate();
+ *   coldFuture
+ *     .then(cb1)
+ *     .via(ex1)
+ *     .then(cb2)
+ *     .then(cb3)
+ *     .via(ex2)
+ *     .then(cb4)
+ *     .then(cb5);
+ *   ...
+ *   coldFuture.activate();
+ *
+ * Using a Later means you don't have to grab a handle to the first Future and
+ * deactivate it.
+ *
+ * Later used to be a workaround to the thread-unsafe nature of Future
+ * chaining, but that has changed and there is no need to use Later if your
+ * only goal is to traverse thread boundaries with executors. In that case,
+ * just use Future::via().
  *
  * Here is an example of a workflow:
  *
@@ -49,14 +75,6 @@ template <typename T> struct isLater;
  *   .via(serverExecutor)
  *   .then([=]Try<DiskResponse>&& t) { return sendClientResponse(t.value()); })
  *   .launch();
- *
- * Although this workflow traverses many threads, we are able to string
- * continuations together in a threadsafe manner.
- *
- * Laters can also be used to wrap preexisting asynchronous modules that were
- * not built with wangle in mind. You can create a Later with a function that
- * takes a callback as input. The function will not actually be called until
- * launch(), allowing you to string then() statements on top of the callback.
  */
 template <class T>
 class Later {
@@ -88,9 +106,9 @@ class Later {
 
   /*
    * This constructor is used to wrap a pre-existing cob-style asynchronous api
-   * so that it can be used in wangle in a threadsafe manner. wangle provides
-   * the callback to this pre-existing api, and this callback will fulfill a
-   * promise so as to incorporate this api into the workflow.
+   * so that it can be used in wangle. wangle provides the callback to this
+   * pre-existing api, and this callback will fulfill a promise so as to
+   * incorporate this api into the workflow.
    *
    * Example usage:
    *
@@ -101,6 +119,7 @@ class Later {
    *   addAsync(1, 2, std::move(fn));
    * });
    */
+  // TODO we should implement a makeFuture-ish with this pattern too, now.
   template <class U,
             class = typename std::enable_if<!std::is_void<U>::value>::type,
             class = typename std::enable_if<std::is_same<T, U>::value>::type>
@@ -129,8 +148,7 @@ class Later {
    * be chained to the new Later before launching the new Later.
    *
    * This can be used to build asynchronous modules that can be called from a
-   * user thread and completed in a callback thread. Callbacks can be set up
-   * ahead of time without thread safety issues.
+   * user thread and completed in a callback thread.
    *
    * Using the Later(std::function<void(std::function<void(T&&)>)>&& fn)
    * constructor, you can wrap existing asynchronous modules with a Later and
@@ -153,20 +171,13 @@ class Later {
    * called in the executor provided in the constructor. Subsequent then()
    * calls will be made, potentially changing threads if a via() call is made.
    * The future returned will be fulfilled in the last executor.
-   *
-   * Thread safety issues of Futures still apply. If you want to wait on the
-   * Future, it must be done in the thread that will fulfil it.
    */
   Future<T> launch();
 
   /*
-   * Same as launch, only no Future is returned. This guarantees thread safe
-   * cleanup of the internal Futures, even if the Later completes in a different
-   * thread than the thread that calls fireAndForget().
-   *
    * Deprecated. Use launch()
    */
-  void fireAndForget() { launch(); }
+  void fireAndForget() __attribute__ ((deprecated)) { launch(); }
 
  private:
   Promise<void> starter_;
index 662a1ea07e500cf3d5c91c30f0bf5bdd9c81a778..9580db004dd1f4a5347f607bd356c06300093f35 100644 (file)
@@ -191,28 +191,39 @@ This is legal and technically threadsafe. However, it is important to realize th
 
 Naturally, you will want some control over which thread executes callbacks. We have a few mechanisms to help.
 
-The first and most useful is `Later`, which behaves like a Future but nothing starts executing until `launch` is called. Thus you avoid the race condition when setting up the multithreaded workflow. 
+The first and most useful is `via`, which passes execution through an `Executor`, which usually has the effect of running the callback in a new thread.
 ```C++
-Later<void>()
+aFuture
   .then(x)
   .via(e1).then(y1).then(y2)
-  .via(e2).then(z)
-  .launch();
+  .via(e2).then(z);
 ```
-`x` will execute in the current thread (the one calling `launch`). `y1` and `y2` will execute in the thread on the other side of `e1`, and `z` will execute in the thread on the other side of `e2`. `y1` and `y2` will execute on the same thread, whichever thread that is. If `e1` and `e2` execute in different threads than the current thread, then the final callback does not happen in the current thread. If you want to get back to the current thread, you need to get there via an executor.
+`x` will execute in the current thread. `y1` and `y2` will execute in the thread on the other side of `e1`, and `z` will execute in the thread on the other side of `e2`. `y1` and `y2` will execute on the same thread, whichever thread that is. If `e1` and `e2` execute in different threads than the current thread, then the final callback does not happen in the current thread. If you want to get back to the current thread, you need to get there via an executor.
 
-`Future::via(Executor*)` will return a Later, too.
+This works because `via` returns a deactivated ("cold") Future, which blocks the propagation of callbacks until it is activated. Activation happens either explicitly (`activate`) or implicitly when the Future returned by `via` is destructed. In this example, there is no ambiguity about in which context any of the callbacks happen (including `y2`), because propagation is blocked at the `via` callsites until after everything is wired up (temporaries are destructed after the calls to `then` have completed).
+
+You can still have a race after `via` if you break it into multiple statements, e.g. in this counterexample:
+```C++
+f = f.via(e1).then(y1).then(y2); // nothing racy here
+f2.then(y3); // racy
+```
+If you want more control over the delayed execution, check out `Later`.
+```C++
+Later<void> later;
+later = later.via(e1).then(y1).then(y2); // nothing racy here
+later = later.then(y3); // nor here
+later.launch(); // explicit launch
+```
 
 The third and least flexible (but sometimes very useful) method assumes only two threads and that you want to do something in the far thread, then come back to the current thread. `ThreadGate` is an interface for a bidirectional gateway between two threads. It's usually easier to use a Later, but ThreadGate can be more efficient, and if the pattern is used often in your code it can be more convenient.
 ```C++
 // Using a ThreadGate (which has two executors xe and xw)
 tg.gate(a).then(b);
 
-// Using Later
+// Using via
 makeFuture()
   .via(xe).then(a)
-  .via(xw).then(b)
-  .launch();
+  .via(xw).then(b);
 ```
 
 ## You make me Promises, Promises
index ef8c06af24300a65d2f75240fee58864c0189eb1..d7b5124daeb34964a45baca93a95c499b20758ec 100644 (file)
 namespace folly { namespace wangle {
 
 /**
-  Yo dawg, I heard you like asynchrony so I put asynchrony in your asynchronous
-  framework.
+  The ThreadGate strategy encapsulates a bidirectional gate via two Executors,
+  kind of like a stargate for wangle Future chains. Its implementation is
+  slightly more efficient then using Future::via in both directions, and if
+  the pattern is common it can be more convenient (although the overhead of
+  setting up a ThreadGate is less convenient in most situations).
 
-  Wangle's futures and promises are not thread safe. Counterintuitive as this
-  may seem at first, this is very intentional. Making futures and promises
-  threadsafe drastically reduces their performance.
+    // Using a ThreadGate (which has two executors xe and xw)
+    tg.gate(a).then(b);
 
-  On the other hand, an asynchronous framework isn't much use if you can't do
-  asynchronous things in other threads. So we use the ThreadGate strategy to
-  decouple the threads and their futures with a form of message passing.
+    // Using via
+    makeFuture()
+      .via(xe).then(a)
+      .via(xw).then(b);
+
+  If you're not sure whether you want a ThreadGate, you don't. Use via.
 
   There are two actors, the east thread which does the asynchronous operation
   (the server) and the west thread that wants the asynchronous operation done
@@ -67,9 +72,6 @@ namespace folly { namespace wangle {
   Future change toward a multithreaded architecture easier, as you need only
   change the components of the ThreadGate which your client code is already
   using.
-
-  Later (in Later.h) is an alternative mechanism for thread-traversing
-  asynchronous workflows.
   */
 class ThreadGate {
 public:
index 39b6a7f2c4b573695c4a083e687260e617099c52..4c78d956f589ffc66f3b8cf3008541c675b12ed3 100644 (file)
@@ -109,6 +109,7 @@ class State {
     if (!callback_) {
       setCallback([](Try<T>&&) {});
     }
+    activate();
     detachOne();
   }
 
@@ -120,10 +121,24 @@ class State {
     detachOne();
   }
 
+  void deactivate() {
+    std::lock_guard<std::mutex> lock(mutex_);
+    active_ = false;
+  }
+
+  void activate() {
+    {
+      std::lock_guard<std::mutex> lock(mutex_);
+      active_ = true;
+    }
+    maybeCallback();
+  }
+
  private:
   void maybeCallback() {
     std::lock_guard<std::mutex> lock(mutex_);
-    if (value_ && callback_) {
+    if (!calledBack_ &&
+        value_ && callback_ && active_) {
       // TODO we should probably try/catch here
       callback_(std::move(*value_));
       calledBack_ = true;
@@ -150,6 +165,7 @@ class State {
   std::function<void(Try<T>&&)> callback_;
   bool calledBack_ = false;
   unsigned char detached_ = 0;
+  bool active_ = true;
 
   // this lock isn't meant to protect all accesses to members, only the ones
   // that need to be threadsafe: the act of setting value_ and callback_, and
index cbf76a152bca300460637b2eedee81788bf1df42..df5341de38c1d1b0a982e956bb377168277a9f7f 100644 (file)
@@ -25,6 +25,7 @@
 #include <unistd.h>
 #include <folly/wangle/Executor.h>
 #include <folly/wangle/Future.h>
+#include <folly/wangle/ManualExecutor.h>
 
 using namespace folly::wangle;
 using std::pair;
@@ -742,3 +743,48 @@ TEST(Future, waitWithSemaphoreForTime) {
   EXPECT_TRUE(t.isReady());
  }
 }
+
+TEST(Future, callbackAfterActivate) {
+  Promise<void> p;
+  auto f = p.getFuture();
+  f.deactivate();
+
+  size_t count = 0;
+  f.then([&](Try<void>&&) { count++; });
+
+  p.setValue();
+  EXPECT_EQ(0, count);
+
+  f.activate();
+  EXPECT_EQ(1, count);
+}
+
+TEST(Future, activateOnDestruct) {
+  Promise<void> p;
+  auto f = p.getFuture();
+  f.deactivate();
+
+  size_t count = 0;
+  f.then([&](Try<void>&&) { count++; });
+
+  p.setValue();
+  EXPECT_EQ(0, count);
+
+  f = makeFuture(); // force destruction of old f
+  EXPECT_EQ(1, count);
+}
+
+TEST(Future, viaIsCold) {
+  ManualExecutor x;
+  size_t count = 0;
+
+  auto fv = makeFuture().via(&x);
+  fv.then([&](Try<void>&&) { count++; });
+
+  EXPECT_EQ(0, count);
+
+  fv.activate();
+
+  EXPECT_EQ(1, x.run());
+  EXPECT_EQ(1, count);
+}
index 5a00cb5ef509cf604662e75fca19295c4f586bda..bdc03832cc10cc5a743f0245144b5c5a1e4ebf52 100644 (file)
@@ -159,23 +159,3 @@ TEST_F(LaterFixture, chain_laters) {
   }
   EXPECT_EQ(future.value(), 1);
 }
-
-TEST_F(LaterFixture, fire_and_forget) {
-  auto west = westExecutor.get();
-  later.via(eastExecutor.get()).then([=](Try<void>&& t) {
-    west->add([]() {});
-  }).fireAndForget();
-  waiter->makeProgress();
-}
-
-TEST(Later, FutureViaReturnsLater) {
-  ManualExecutor x;
-  {
-    Future<void> f = makeFuture();
-    Later<void> l = f.via(&x);
-  }
-  {
-    Future<int> f = makeFuture(42);
-    Later<int> l = f.via(&x);
-  }
-}