add per node mutex for emulated futex
authorNathan Bronson <ngbronson@fb.com>
Thu, 9 Oct 2014 02:44:16 +0000 (19:44 -0700)
committerAndrii Grynenko <andrii@fb.com>
Wed, 15 Oct 2014 00:56:23 +0000 (17:56 -0700)
Summary:
The emulated futex (not used on Linux) has an optimization to defer
notification until after the bucket mutex has been unlocked, to avoid
lock collisions between the waiter and waker.  That code will have a
use-after-free problem if the waiter's condition_variable has a spurious
wakeup, which is allowed by the spec.  That code also doesn't do
anything about contention between multiple waiters.

This diff adds a mutex to each wait node, relieving the waiters from
having to acquire the bucket lock on wakeup.  Rather than trying to
perform a racy late notification, we just make sure to release the node
lock immediately after calling notify_one, which seems to have the
desired effect.

Test Plan:
1) existing unit tests
2) new unit tests

Reviewed By: delong.j@fb.com

Subscribers: boseant, njormrod

FB internal diff: D1602360

folly/detail/Futex.cpp
folly/test/FutexTest.cpp

index 489cec39dbdd2b45b687c5ef0783e492beb99ba8..d18614491162b0e2bdff9751d6ce2d5c400f49bc 100644 (file)
@@ -123,17 +123,24 @@ FutexResult nativeFutexWaitImpl(void* addr,
 ///////////////////////////////////////////////////////
 // compatibility implementation using standard C++ API
 
+// Our emulated futex uses 4096 lists of wait nodes.  There are two levels
+// of locking: a per-list mutex that controls access to the list and a
+// per-node mutex, condvar, and bool that are used for the actual wakeups.
+// The per-node mutex allows us to do precise wakeups without thundering
+// herds.
+
 struct EmulatedFutexWaitNode : public boost::intrusive::list_base_hook<> {
   void* const addr_;
   const uint32_t waitMask_;
-  bool hasTimeout_;
+
+  // tricky: hold both bucket and node mutex to write, either to read
   bool signaled_;
+  std::mutex mutex_;
   std::condition_variable cond_;
 
-  EmulatedFutexWaitNode(void* addr, uint32_t waitMask, bool hasTimeout)
+  EmulatedFutexWaitNode(void* addr, uint32_t waitMask)
     : addr_(addr)
     , waitMask_(waitMask)
-    , hasTimeout_(hasTimeout)
     , signaled_(false)
   {
   }
@@ -162,42 +169,24 @@ std::once_flag EmulatedFutexBucket::gBucketInit;
 
 int emulatedFutexWake(void* addr, int count, uint32_t waitMask) {
   auto& bucket = EmulatedFutexBucket::bucketFor(addr);
+  std::unique_lock<std::mutex> bucketLock(bucket.mutex_);
 
   int numAwoken = 0;
-  boost::intrusive::list<EmulatedFutexWaitNode> deferredWakeups;
-
-  {
-    std::unique_lock<std::mutex> lock(bucket.mutex_);
-
-    for (auto iter = bucket.waiters_.begin();
-         numAwoken < count && iter != bucket.waiters_.end(); ) {
-      auto current = iter;
-      auto& node = *iter++;
-      if (node.addr_ == addr && (node.waitMask_ & waitMask) != 0) {
-        // We unlink, but waiter destroys the node.  We must signal timed
-        // waiters under the lock, to avoid a race where we release the lock,
-        // the waiter times out and deletes the node, and then we try to
-        // signal it.  This problem doesn't exist for unbounded waiters,
-        // so for them we optimize their wakeup by releasing the lock first.
-        bucket.waiters_.erase(current);
-        if (node.hasTimeout_) {
-          node.signaled_ = true;
-          node.cond_.notify_one();
-        } else {
-          deferredWakeups.push_back(node);
-        }
-        ++numAwoken;
-      }
+  for (auto iter = bucket.waiters_.begin();
+       numAwoken < count && iter != bucket.waiters_.end(); ) {
+    auto current = iter;
+    auto& node = *iter++;
+    if (node.addr_ == addr && (node.waitMask_ & waitMask) != 0) {
+      ++numAwoken;
+
+      // we unlink, but waiter destroys the node
+      bucket.waiters_.erase(current);
+
+      std::unique_lock<std::mutex> nodeLock(node.mutex_);
+      node.signaled_ = true;
+      node.cond_.notify_one();
     }
   }
-
-  while (!deferredWakeups.empty()) {
-    auto& node = deferredWakeups.front();
-    deferredWakeups.pop_front();
-    node.signaled_ = true;
-    node.cond_.notify_one();
-  }
-
   return numAwoken;
 }
 
@@ -207,30 +196,39 @@ FutexResult emulatedFutexWaitImpl(
         time_point<system_clock>* absSystemTime,
         time_point<steady_clock>* absSteadyTime,
         uint32_t waitMask) {
-  bool hasTimeout = absSystemTime != nullptr || absSteadyTime != nullptr;
-  EmulatedFutexWaitNode node(addr, waitMask, hasTimeout);
-
   auto& bucket = EmulatedFutexBucket::bucketFor(addr);
-  std::unique_lock<std::mutex> lock(bucket.mutex_);
+  EmulatedFutexWaitNode node(addr, waitMask);
 
-  uint32_t actual;
-  memcpy(&actual, addr, sizeof(uint32_t));
-  if (actual != expected) {
-    return FutexResult::VALUE_CHANGED;
-  }
+  {
+    std::unique_lock<std::mutex> bucketLock(bucket.mutex_);
+
+    uint32_t actual;
+    memcpy(&actual, addr, sizeof(uint32_t));
+    if (actual != expected) {
+      return FutexResult::VALUE_CHANGED;
+    }
+
+    bucket.waiters_.push_back(node);
+  } // bucketLock scope
 
-  bucket.waiters_.push_back(node);
-  while (!node.signaled_) {
-    std::cv_status status = std::cv_status::no_timeout;
-    if (absSystemTime != nullptr) {
-      status = node.cond_.wait_until(lock, *absSystemTime);
-    } else if (absSteadyTime != nullptr) {
-      status = node.cond_.wait_until(lock, *absSteadyTime);
-    } else {
-      node.cond_.wait(lock);
+  std::cv_status status = std::cv_status::no_timeout;
+  {
+    std::unique_lock<std::mutex> nodeLock(node.mutex_);
+    while (!node.signaled_ && status != std::cv_status::timeout) {
+      if (absSystemTime != nullptr) {
+        status = node.cond_.wait_until(nodeLock, *absSystemTime);
+      } else if (absSteadyTime != nullptr) {
+        status = node.cond_.wait_until(nodeLock, *absSteadyTime);
+      } else {
+        node.cond_.wait(nodeLock);
+      }
     }
+  } // nodeLock scope
 
-    if (status == std::cv_status::timeout) {
+  if (status == std::cv_status::timeout) {
+    // it's not really a timeout until we unlink the unsignaled node
+    std::unique_lock<std::mutex> bucketLock(bucket.mutex_);
+    if (!node.signaled_) {
       bucket.waiters_.erase(bucket.waiters_.iterator_to(node));
       return FutexResult::TIMEDOUT;
     }
index c1063cf8a687d12ad9b3cf4740359ef32fe9d2d7..3c5c63d754ee312c87f9f76cf4ceb4316927b363 100644 (file)
@@ -54,36 +54,36 @@ void run_basic_tests() {
   DSched::join(thr);
 }
 
-template<template<typename> class Atom>
-void run_wait_until_tests();
+template <template<typename> class Atom, typename Clock>
+void liveClockWaitUntilTests() {
+  Futex<Atom> f(0);
 
-template <typename Clock>
-void stdAtomicWaitUntilTests() {
-  Futex<std::atomic> f(0);
-
-  auto thrA = DSched::thread([&]{
-    while (true) {
-      typename Clock::time_point nowPlus2s = Clock::now() + seconds(2);
-      auto res = f.futexWaitUntil(0, nowPlus2s);
-      EXPECT_TRUE(res == FutexResult::TIMEDOUT || res == FutexResult::AWOKEN);
-      if (res == FutexResult::AWOKEN) {
-        break;
+  for (int stress = 0; stress < 1000; ++stress) {
+    auto fp = &f; // workaround for t5336595
+    auto thrA = DSched::thread([fp,stress]{
+      while (true) {
+        auto deadline = Clock::now() + microseconds(1 << (stress % 20));
+        auto res = fp->futexWaitUntil(0, deadline);
+        EXPECT_TRUE(res == FutexResult::TIMEDOUT || res == FutexResult::AWOKEN);
+        if (res == FutexResult::AWOKEN) {
+          break;
+        }
       }
+    });
+
+    while (f.futexWake() != 1) {
+      std::this_thread::yield();
     }
-  });
 
-  while (f.futexWake() != 1) {
-    std::this_thread::yield();
+    DSched::join(thrA);
   }
 
-  DSched::join(thrA);
-
   auto start = Clock::now();
   EXPECT_EQ(f.futexWaitUntil(0, start + milliseconds(100)),
             FutexResult::TIMEDOUT);
   LOG(INFO) << "Futex wait timed out after waiting for "
             << duration_cast<milliseconds>(Clock::now() - start).count()
-            << "ms";
+            << "ms, should be ~100ms";
 }
 
 template <typename Clock>
@@ -96,10 +96,10 @@ void deterministicAtomicWaitUntilTests() {
   EXPECT_TRUE(res == FutexResult::TIMEDOUT || res == FutexResult::INTERRUPTED);
 }
 
-template <>
-void run_wait_until_tests<std::atomic>() {
-  stdAtomicWaitUntilTests<system_clock>();
-  stdAtomicWaitUntilTests<steady_clock>();
+template<template<typename> class Atom>
+void run_wait_until_tests() {
+  liveClockWaitUntilTests<Atom, system_clock>();
+  liveClockWaitUntilTests<Atom, steady_clock>();
 }
 
 template <>
@@ -177,6 +177,11 @@ TEST(Futex, basic_live) {
   run_wait_until_tests<std::atomic>();
 }
 
+TEST(Futex, basic_emulated) {
+  run_basic_tests<EmulatedFutexAtomic>();
+  run_wait_until_tests<EmulatedFutexAtomic>();
+}
+
 TEST(Futex, basic_deterministic) {
   DSched sched(DSched::uniform(0));
   run_basic_tests<DeterministicAtomic>();