(wangle) fix a race condition in Core::maybeCallback
authorHans Fugal <fugalh@fb.com>
Wed, 5 Nov 2014 23:44:06 +0000 (15:44 -0800)
committerPavlo Kushnir <pavlo@fb.com>
Sat, 8 Nov 2014 02:38:49 +0000 (18:38 -0800)
Summary:
`calledBack_` could be seen as true by both threads in this conditional. Classic rookie mistake. :-/

Test Plan: run unit tests

Reviewed By: darshan@fb.com

Subscribers: trunkagent, hannesr, net-systems@, fugalh, exa, njormrod, folly-diffs@

FB internal diff: D1661199

Tasks: 55429385506504

Signature: t1:1661199:1415215840:fb69f56c8cf6f59beeca809724ce015b5260d9ad

Blame Revision: D1636487

folly/wangle/detail/Core.h
folly/wangle/test/FutureTest.cpp

index bae0c8707e803e5861730eb0845770a7dc3ed21f..1113e0b2411373cd4efb55065b47efdde2a07813 100644 (file)
@@ -193,18 +193,19 @@ class Core : protected FSM<State> {
  private:
   void maybeCallback() {
     assert(ready());
-    if (!calledBack_ && isActive() && callback_) {
-      // TODO(5306911) we should probably try/catch
-      calledBack_ = true;
-      Executor* x = executor_;
-
-      RequestContext::setContext(context_);
-      if (x) {
-        MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_));
-        MoveWrapper<folly::Optional<Try<T>>> val(std::move(result_));
-        x->add([cb, val]() mutable { (*cb)(std::move(**val)); });
-      } else {
-        callback_(std::move(*result_));
+    if (isActive() && callback_) {
+      if (!calledBack_.exchange(true)) {
+        // TODO(5306911) we should probably try/catch
+        Executor* x = executor_;
+
+        RequestContext::setContext(context_);
+        if (x) {
+          MoveWrapper<std::function<void(Try<T>&&)>> cb(std::move(callback_));
+          MoveWrapper<folly::Optional<Try<T>>> val(std::move(result_));
+          x->add([cb, val]() mutable { (*cb)(std::move(**val)); });
+        } else {
+          callback_(std::move(*result_));
+        }
       }
     }
   }
index 1ecca30ceda73e040316fd5ff62b27b2f8e7dad9..db15993c2b25c0db2ba55fd75d5327ffa5ca11dd 100644 (file)
@@ -27,6 +27,7 @@
 #include <folly/wangle/Executor.h>
 #include <folly/wangle/Future.h>
 #include <folly/wangle/ManualExecutor.h>
+#include <folly/MPMCQueue.h>
 
 #include <folly/io/async/Request.h>
 
@@ -40,6 +41,43 @@ using std::vector;
 #define EXPECT_TYPE(x, T) \
   EXPECT_TRUE((std::is_same<decltype(x), T>::value))
 
+/// Simple executor that does work in another thread
+class ThreadExecutor : public Executor {
+  folly::MPMCQueue<Func> funcs;
+  std::atomic<bool> done {false};
+  std::thread worker;
+  folly::Baton<> baton;
+
+  void work() {
+    baton.post();
+    Func fn;
+    while (!done) {
+      while (!funcs.isEmpty()) {
+        funcs.blockingRead(fn);
+        fn();
+      }
+    }
+  }
+
+ public:
+  ThreadExecutor(size_t n = 1024)
+    : funcs(n), worker(std::bind(&ThreadExecutor::work, this)) {}
+
+  ~ThreadExecutor() {
+    done = true;
+    funcs.write([]{});
+    worker.join();
+  }
+
+  void add(Func fn) override {
+    funcs.blockingWrite(std::move(fn));
+  }
+
+  void waitForStartup() {
+    baton.wait();
+  }
+};
+
 typedef WangleException eggs_t;
 static eggs_t eggs("eggs");
 
@@ -950,3 +988,30 @@ TEST(Future, context) {
   // Fulfil the promise
   p.setValue();
 }
+
+
+// This only fails about 1 in 1k times when the bug is present :(
+TEST(Future, t5506504) {
+  ThreadExecutor x;
+
+  auto fn = [&x]{
+    auto promises = std::make_shared<vector<Promise<void>>>(4);
+    vector<Future<void>> futures;
+
+    for (auto& p : *promises) {
+      futures.emplace_back(
+        p.getFuture()
+        .via(&x)
+        .then([](Try<void>&&){}));
+    }
+
+    x.waitForStartup();
+    x.add([promises]{
+      for (auto& p : *promises) p.setValue();
+    });
+
+    return whenAll(futures.begin(), futures.end());
+  };
+
+  waitWithSemaphore(fn());
+}