(wangle) Use an atomic_flag to make FutureObject threadsafe
authorHans Fugal <fugalh@fb.com>
Thu, 3 Apr 2014 15:48:29 +0000 (08:48 -0700)
committerSara Golemon <sgolemon@fb.com>
Fri, 4 Apr 2014 02:54:40 +0000 (19:54 -0700)
Summary:
There are two operations on `FutureObject` (the backing object that both `Future` and `Promise` front), which are not threadsafe currently. Those are `setContinuation` and `fulfil`. They're not threadsafe because they have a race condition between checking to see whether they should run the continuation and the other side making its modification. This diff introduces a `std::atomic_flag` variable to avoid the race and make these threadsafe.

NB Making `Future`/`Promise` threadsafe does not make this pattern safe:

f1.via(...).then(...).then(...)

(In a hypothetical world where `Future::via` is another name for `executeWithSameThread`)
There is a race between the future from `via` and the call of `then` (and between the first and second `then`s), so the `then`s could execute in the far thread as intended, or they could execute in the current thread (probably not what was intended). This diff does not solve that semantic problem of which thread this runs in. But it does make it safer, in that all those continuations will always execute, just maybe not in the thread you had intended.

You may ask what is the benefit if that semantic problem still exists? That's up for debate. I think the safety is worth it: at least everything a n00b throws at it will *work*. The question of thread semantics can iterate. If we end up with thread semantics that really don't need locks (e.g. maybe because we move to an Rx/Later style "launch" at the end of a chain) we can always toss this atomic.

How does this affect performance? I'm doing some experiments and going to write up my findings. Early results indicate that outlier performance is impacted but not by a lot, but don't count those chickens until I hatch them. Part of the reason for sending this diff out is to run windtunnel experiments.

Test Plan: tests, benchmark, close inspection

Reviewed By: davejwatson@fb.com

FB internal diff: D1241822

folly/wangle/Future-inl.h
folly/wangle/Future.h
folly/wangle/Later-inl.h
folly/wangle/Later.h
folly/wangle/README.md
folly/wangle/detail.h

index 6b5907dc7693a86cb2fcad34b1f7f257a2fb6521..0e19cc7993ddc7b926e5a00be8fec4e72b2b8f07 100644 (file)
@@ -186,7 +186,7 @@ Try<T>& Future<T>::getTry() {
 
 template <class T>
 template <typename Executor>
-inline Future<T> Future<T>::executeWithSameThread(Executor* executor) {
+inline Future<T> Future<T>::via(Executor* executor) {
   throwIfInvalid();
 
   folly::MoveWrapper<Promise<T>> p;
index 3b13b194bdfef95178c53ac3a4ce45dc4d6adbbf..441d1f5a453235481751c9a91777b7641876f5e4 100644 (file)
@@ -58,8 +58,23 @@ class Future {
   typename std::add_lvalue_reference<const T>::type
   value() const;
 
+  /// Returns a future which will call back on the other side of executor.
+  ///
+  ///   f.via(e).then(a); // safe
+  ///
+  ///   f.via(e).then(a).then(b); // faux pas
+  ///
+  /// a will definitely execute in the intended thread, but b may execute
+  /// either in that thread, or in the current thread. If you need to
+  /// guarantee where b executes, use a Later.
   template <typename Executor>
-  Future<T> executeWithSameThread(Executor* executor);
+  Future<T> via(Executor* executor);
+
+  /// Deprecated alias for via
+  template <typename Executor>
+  Future<T> executeWithSameThread(Executor* executor) {
+    return via(executor);
+  }
 
   /**
      Thread-safe version of executeWith
@@ -71,6 +86,8 @@ class Future {
      Instead, you may pass in a Promise so that we can set up
      the rest of the chain in advance, without any racey
      modifications of the continuation
+
+     Deprecated. Use a Later.
    */
   template <typename Executor>
   void executeWith(Executor* executor, Promise<T>&& cont_promise);
index e20b6f6fc89e26db201fd17676379d5934a90df6..aaba5c94395d79be032a16dbe2ad6ee7e6839628 100644 (file)
@@ -143,10 +143,4 @@ Future<T> Later<T>::launch() {
   return std::move(*future_);
 }
 
-template <class T>
-void Later<T>::fireAndForget() {
-  future_->setContinuation([] (Try<T>&& t) {}); // detach
-  starter_.setValue();
-}
-
 }}
index 0d2088cf93ecb279f2f6fd0dbacc68d6b8a79392..0f58ff3c848300dd4998c1549651ec2b764112bb 100644 (file)
@@ -150,8 +150,7 @@ class Later {
    * The future returned will be fulfilled in the last executor.
    *
    * Thread safety issues of Futures still apply. If you want to wait on the
-   * Future, it must be done in the thread that will fulfil it. If you do not
-   * plan to use the result of the Future, use fireAndForget()
+   * Future, it must be done in the thread that will fulfil it.
    */
   Future<T> launch();
 
@@ -159,8 +158,10 @@ class Later {
    * Same as launch, only no Future is returned. This guarantees thread safe
    * cleanup of the internal Futures, even if the Later completes in a different
    * thread than the thread that calls fireAndForget().
+   *
+   * Deprecated. Use launch()
    */
-  void fireAndForget();
+  void fireAndForget() { launch(); }
 
  private:
   Promise<void> starter_;
index f51dc21092aed20874148d26c7ea4d0c571c94b0..4b793330e4826337168b8c8dd54ea5dbc03833c0 100644 (file)
@@ -167,32 +167,55 @@ Using `then` to add continuations is idiomatic. It brings all the code into one
 
 Up to this point we have skirted around the matter of waiting for Futures. You may never need to wait for a Future, because your code is event-driven and all follow-up action happens in a then-block. But if want to have a batch workflow, where you initiate a batch of asynchronous operations and then wait for them all to finish at a synchronization point, then you will want to wait for a Future.
 
-Other future frameworks like Finagle and std::future/boost::future, give you the ability to wait directly on a Future, by calling `fut.wait()` (naturally enough). Wangle has diverged from this pattern for performance reasons. It turns out, making things threadsafe slows them down.  Whodathunkit? So Wangle Futures (and Promises, for you API developers) are not threadsafe. Yes, you heard right, and it should give you pause—what good is an *asynchronous* framework that isn't threadsafe? Well, actually, in an event-driven architecture there's still quite a bit of value, but that doesn't really answer the question. Wangle is, indeed, meant to be used in multithreaded environments. It's just that we move synchronization out of the Future/Promise pair and instead require explicit synchronization or (preferably) crossing thread boundaries with a form of message passing. It turns out that `then()` chaining is really handy and there are often many Futures chained together. But there is often only one thread boundary to cross.  We choose to pay the threadsafety overhead only at that thread boundary.
+Other future frameworks like Finagle and std::future/boost::future, give you the ability to wait directly on a Future, by calling `fut.wait()` (naturally enough). Wangle has diverged from this pattern because we don't want to be in the business of dictating how your thread waits. We may work out something that we feel is sufficiently general, in the meantime adapt this spin loop to however your thread should wait:
 
-Wangle provides a mechanism to make this easy, called a `ThreadGate`. ThreadGates stand between two threads and pass messages (actually, functors) back and forth in a threadsafe manner. Let's work an example. Assume that `MemcacheClient::get()` is not thread-aware. It registers with libevent and tries to send a request, and then later in your event loop when it has successfully sent and received a reply it will complete the Future. But it doesn't even consider that there might be other threads in your program.  Now consider that `get()` calls should happen in an IO thread (the *east* thread) and user code is happening in a user thread (the *west* thread).  A ThreadGate will allow us to do this:
+  while (!f.isReady()) {}
+
+(Hint: you might want to use an event loop or a semaphore or something. You probably don't want to just spin like this.)
+
+Wangle is partially threadsafe. A Promise or Future can migrate between threads as long as there's a full memory barrier of some sort. `Future::then` and `Promise::setValue` (and all variants that boil down to those two calls) can be called from different threads. BUT, be warned that you might be surprised about which thread your callback executes on. Let's consider an example.
 
 ```C++
-Future<GetReply> threadsafeGet(std::string key) {
-  std::function<Future<GetReply>()> doEast = [=]() {
-    return mc_->get(key);
-  };
-  auto westFuture = gate_.add(doEast);
-  return westFuture;
-}
+// Thread A
+Promise<void> p;
+auto f = p.getFuture();
+
+// Thread B
+f.then(x).then(y).then(z);
+
+// Thread A
+p.setValue();
 ```
 
-Think of the ThreadGate as a pair of queues: from west to east we queue some functor that returns a Future, and then the ThreadGate conveys the result back from east to west and eventually fulfils the west Future. But when? The ThreadGate has to be *driven* from both sides—the IO thread has to pull work off the west-to-east queue, and the user thread has to drive the east-to-west queue. Sometimes this happens in the course of an event loop, as it would in a libevent architecture. Other times it has to be explicit, in which case you would call the gate's `makeProgress()` method. Or, if you know which Future you want to wait for you can use:
+This is legal and technically threadsafe. However, it is important to realize that you do not know in which thread `x`, `y`, and/or `z` will execute. Maybe they will execute in Thread A when `p.setValue()` is called. Or, maybe they will execute in Thread B when `f.then` is called. Or, maybe `x` will execute in Thread B, but `y` and/or `z` will execute in Thread A. There's a race between `setValue` and `then`—whichever runs last will execute the callback. The only guarantee is that one of them will run the callback.
 
+Naturally, you will want some control over which thread executes callbacks. We have a few mechanisms to help.
+
+The first and most useful is `Later`, which behaves like a Future but nothing starts executing until `launch` is called. Thus you avoid the race condition when setting up the multithreaded workflow. 
 ```C++
-gate_.waitFor(fut);
+Later<void>()
+  .then(x)
+  .via(e1).then(y1).then(y2)
+  .via(e2).then(z)
+  .launch();
 ```
+`x` will execute in the current thread (the one calling `launch`). `y1` and `y2` will execute in the thread on the other side of `e1`, and `z` will execute in the thread on the other side of `e2`. `y1` and `y2` will execute on the same thread, whichever thread that is. If `e1` and `e2` execute in different threads than the current thread, then the final callback does not happen in the current thread. If you want to get back to the current thread, you need to get there via an executor.
 
-The ThreadGate interface is simple, so any kind of threadsafe functor conveyance you can dream up that is perfect for your application can be implemented. Or, you can use `GenericThreadGate` and `Executor`s to make one out of existing pieces. The `ManualThreadGate` is worth a look as well, especially for unit tests.
+The second and most basic is `Future::via(Executor*)`, which creates a future which will execute its callback via the given executor. i.e. given `f.via(e).then(x)`, `x` will always execute via executor `e`. NB given `f.via(e).then(x).then(y)`, `y` is *not* guaranteed to execute via `e` or in the same thread as `x` (use a Later).
 
-In practice, the API will probably do the gating for you, e.g. `MemcacheClient::get()` would return an already-gated Future and provide a `waitFor()` proxy that lets you wait for Futures it has returned. Then as a user you never have to worry about it. But this exposition was to explain the probably-surprising design decision to make Futures not threadsafe and not support direct waiting.
+TODO implement `Future::then(callback, executor)` so we can do the above with a single Future.
 
-`Later` is another approach to crossing thread boundaries that can be more flexible than ThreadGates.
-(TODO document `Later` here.)
+The third and least flexible (but sometimes very useful) method assumes only two threads and that you want to do something in the far thread, then come back to the current thread. `ThreadGate` is an interface for a bidirectional gateway between two threads. It's usually easier to use a Later, but ThreadGate can be more efficient, and if the pattern is used often in your code it can be more convenient.
+```C++
+// Using a ThreadGate (which has two executors xe and xw)
+tg.gate(a).then(b);
+
+// Using Later
+makeFuture()
+  .via(xe).then(a)
+  .via(xw).then(b)
+  .launch();
+```
 
 ## You make me Promises, Promises
 
@@ -250,7 +273,6 @@ See also http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2012/n3428.pdf
 - 1.53 is brand new, and not in fbcode
 - It's still a bit buggy/bleeding-edge
 - They haven't fleshed out the threading model very well yet, e.g. every single `then` currently spawns a new thread unless you explicitly ask it to work on this thread only, and there is no support for executors yet.
-- boost::future has locking which isn't necessary in our cooperative-multithreaded use case, and assumed to be very expensive.
 
 ### Why use heap-allocated shared state? Why is Promise not a subclass of Future?
 C++. It boils down to wanting to return a Future by value for performance (move semantics and compiler optimizations), and programmer sanity, and needing a reference to the shared state by both the user (which holds the Future) and the asynchronous operation (which holds the Promise), and allowing either to go out of scope.
@@ -263,10 +285,3 @@ C++ doesn't directly support continuations very well. But there are some ways to
 The tradeoff is memory. Each continuation has a stack, and that stack is usually fixed-size and has to be big enough to support whatever ordinary computation you might want to do on it. So each living continuation requires a relatively large amount of memory. If you know the number of continuations will be small, this might be a good fit. In particular, it might be faster and the code might read cleaner.
 
 Wangle takes the middle road between callback hell and continuations, one which has been trodden and proved useful in other languages. It doesn't claim to be the best model for all situations. Use your tools wisely.
-
-### It's so @!#?'n hard to get the thread safety right
-That's not a question.
-
-Yes, it is hard and so you should use a ThreadGate or Later if you need to do any crossing of thread boundaries. Otherwise you need to be very careful. Especially because in most cases the naïve approach is not threadsafe and naïve testing doesn't expose the race condition.
-
-The careful reader will note that we have only assumed locks to be a terrible performance penalty. We are planning on thoroughly benchmarking locks (and the alternative code patterns that having locks would enable), and if we find locks are Not That Bad™ we might reverse this decision (and fold ThreadGate and Later into Future/Promise).
index b1d6ade42a2b60dfbe0f4543512622a3bef6a6c0..caa6cb723c07f06a14f693e3a4184a46b99c3983 100644 (file)
@@ -50,11 +50,11 @@ class FutureObject {
       throw std::logic_error("setContinuation called twice");
     }
 
-    if (value_.hasValue()) {
-      func(std::move(*value_));
+    continuation_ = std::move(func);
+
+    if (shouldContinue_.test_and_set()) {
+      continuation_(std::move(*value_));
       delete this;
-    } else {
-      continuation_ = std::move(func);
     }
   }
 
@@ -63,11 +63,11 @@ class FutureObject {
       throw std::logic_error("fulfil called twice");
     }
 
-    if (continuation_) {
-      continuation_(std::move(t));
+    value_ = std::move(t);
+
+    if (shouldContinue_.test_and_set()) {
+      continuation_(std::move(*value_));
       delete this;
-    } else {
-      value_ = std::move(t);
     }
   }
 
@@ -88,6 +88,7 @@ class FutureObject {
   }
 
  private:
+  std::atomic_flag shouldContinue_ = ATOMIC_FLAG_INIT;
   folly::Optional<Try<T>> value_;
   std::function<void(Try<T>&&)> continuation_;
 };