(Wangle) Make via behave more like gate
authorHans Fugal <fugalh@fb.com>
Tue, 30 Sep 2014 22:52:52 +0000 (15:52 -0700)
committerDave Watson <davejwatson@fb.com>
Tue, 30 Sep 2014 23:17:18 +0000 (16:17 -0700)
Summary:
Could the problem be that via continues the existing chain of futures,
whereas we actually want to start a new chain?

Is there any particular reason this wasn't implemented like this originally?

Test Plan:
Ran all the unit tests. I hope to try to reproduce the thread issue and
see if this improves things.

Reviewed By: davejwatson@fb.com

Subscribers: trunkagent, net-systems@, exa, njormrod, fugalh

FB internal diff: D1500225

Tasks: 4920689

folly/wangle/Future-inl.h
folly/wangle/Future.h
folly/wangle/detail/State.h
folly/wangle/test/FutureTest.cpp

index 9a0fe9974ad74652156f2a2ffc4768b3a0ea50f3..f404067b8afab3a39176f4addc2885933c303a63 100644 (file)
@@ -193,14 +193,41 @@ template <class T>
 template <typename Executor>
 inline Future<T> Future<T>::via(Executor* executor) {
   throwIfInvalid();
-  auto f = then([=](Try<T>&& t) {
-    MoveWrapper<Promise<T>> promise;
+  MoveWrapper<Promise<T>> promise;
+
+  auto f = promise->getFuture();
+  // We are obligated to return a cold future.
+  f.deactivate();
+  // But we also need to make this one cold for via to at least work some of
+  // the time. (see below)
+  deactivate();
+
+  then([=](Try<T>&& t) mutable {
     MoveWrapper<Try<T>> tw(std::move(t));
-    auto f2 = promise->getFuture();
+    // There is a race here.
+    // When the promise is fulfilled, and the future is still inactive, when
+    // the future is activated (e.g. by destruction) the callback will happen
+    // in that context, not in the intended context (which has already left
+    // the building).
+    //
+    // Currently, this will work fine because all the temporaries are
+    // destructed in an order that is compatible with this implementation:
+    //
+    //   makeFuture().via(x).then(a).then(b);
+    //
+    // However, this will not work reliably:
+    //
+    //   auto f2 = makeFuture().via(x);
+    //   f2.then(a).then(b);
+    //
+    // Because the first temporary is destructed on the first line, and the
+    // executor is fed. But by the time f2 is destructed, the executor
+    // may have already fulfilled the promise on the other thread.
+    //
+    // TODO(#4920689) fix it
     executor->add([=]() mutable { promise->fulfilTry(std::move(*tw)); });
-    return f2;
   });
-  f.deactivate();
+
   return f;
 }
 
index e9ee2fb2d280f6359d53ba685d520111d54a8101..df30efc5adb2acaa32d32d80bb062252c04d2128 100644 (file)
@@ -204,6 +204,9 @@ class Future {
   void deactivate() {
     state_->deactivate();
   }
+  bool isActive() {
+    return state_->isActive();
+  }
 
  private:
   typedef detail::State<T>* statePtr;
index 491c38faafc71caec3d3625f029c944f71c29705..0f7f4bf1fd628a9a4f9b54fe8829ebceea3b19bd 100644 (file)
@@ -139,11 +139,13 @@ class State {
     maybeCallback();
   }
 
+  bool isActive() { return active_; }
+
  private:
   void maybeCallback() {
     std::lock_guard<decltype(mutex_)> lock(mutex_);
     if (!calledBack_ &&
-        value_ && callback_ && active_) {
+        value_ && callback_ && isActive()) {
       // TODO we should probably try/catch here
       callback_(std::move(*value_));
       calledBack_ = true;
index 1f0fad3c005317d3ac38cbe641c609a3cc3e054d..0f045af3ae1cfe5ed9a95f6ca7f8e630c84ace98 100644 (file)
@@ -764,7 +764,7 @@ TEST(Future, activateOnDestruct) {
   EXPECT_EQ(1, count);
 }
 
-TEST(Future, viaIsCold) {
+TEST(Future, viaActsCold) {
   ManualExecutor x;
   size_t count = 0;
 
@@ -779,6 +779,63 @@ TEST(Future, viaIsCold) {
   EXPECT_EQ(1, count);
 }
 
+TEST(Future, viaIsCold) {
+  ManualExecutor x;
+  EXPECT_FALSE(makeFuture().via(&x).isActive());
+}
+
+TEST(Future, viaRaces) {
+  ManualExecutor x;
+  Promise<void> p;
+  auto tid = std::this_thread::get_id();
+  bool done = false;
+
+  std::thread t1([&] {
+    p.getFuture()
+      .via(&x)
+      .then([&](Try<void>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
+      .then([&](Try<void>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
+      .then([&](Try<void>&&) { done = true; });
+  });
+
+  std::thread t2([&] {
+    p.setValue();
+  });
+
+  while (!done) x.run();
+  t1.join();
+  t2.join();
+}
+
+// TODO(#4920689)
+TEST(Future, DISABLED_viaRaces_2stage) {
+  ManualExecutor x;
+  Promise<void> p;
+  auto tid = std::this_thread::get_id();
+  bool done = false;
+
+  std::thread t1([&] {
+    auto f2 = p.getFuture().via(&x);
+    f2.then([&](Try<void>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
+      .then([&](Try<void>&&) { EXPECT_EQ(tid, std::this_thread::get_id()); })
+      .then([&](Try<void>&&) { done = true; });
+
+    // the bug was in the promise being fulfilled before f2 is reactivated. we
+    // could sleep, but yielding should cause this to fail with reasonable
+    // probability
+    std::this_thread::yield();
+    f2.activate();
+  });
+
+  std::thread t2([&] {
+    p.setValue();
+  });
+
+  while (!done) x.run();
+  t1.join();
+  t2.join();
+}
+
 TEST(Future, getFuture_after_setValue) {
   Promise<int> p;
   p.setValue(42);