synchronization/ParkingLot
authorDave Watson <davejwatson@fb.com>
Thu, 28 Dec 2017 15:46:10 +0000 (07:46 -0800)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Thu, 28 Dec 2017 15:54:39 +0000 (07:54 -0800)
Summary:
A ParkingLot API inspired by linux futex syscall, and WebKit's parkingLot.

Extends the futex interface with lambdas, such that many different sleeping abstractions
can be built.

Reviewed By: yfeldblum, aary

Differential Revision: D6581826

fbshipit-source-id: dba741fe4ed34f27bfad5f5747adce85741441e0

folly/Makefile.am
folly/synchronization/ParkingLot.cpp [new file with mode: 0644]
folly/synchronization/ParkingLot.h [new file with mode: 0644]
folly/synchronization/test/ParkingLotTest.cpp [new file with mode: 0644]

index 9bc83a13cb0caa5e4979b568da0a0c1a07e53a5d..76a95848f2e070cd8b481f4376926d4a323dd72a 100644 (file)
@@ -437,6 +437,7 @@ nobase_follyinclude_HEADERS = \
        synchronization/Baton.h \
        synchronization/CallOnce.h \
        synchronization/LifoSem.h \
+       synchronization/ParkingLot.h \
        synchronization/detail/AtomicUtils.h \
        synchronization/detail/Sleeper.h \
        system/MemoryMapping.h \
@@ -625,6 +626,7 @@ libfolly_la_SOURCES = \
        stats/TimeseriesHistogram.cpp \
        synchronization/AsymmetricMemoryBarrier.cpp \
        synchronization/LifoSem.cpp \
+       synchronization/ParkingLot.cpp \
        system/MemoryMapping.cpp \
        system/Shell.cpp \
        system/ThreadName.cpp \
diff --git a/folly/synchronization/ParkingLot.cpp b/folly/synchronization/ParkingLot.cpp
new file mode 100644 (file)
index 0000000..e3594cc
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2017-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/synchronization/ParkingLot.h>
+
+namespace folly {
+namespace parking_lot_detail {
+
+Bucket& Bucket::bucketFor(uint64_t key) {
+  constexpr size_t const kNumBuckets = kIsMobile ? 256 : 4096;
+
+  // Statically allocating this lets us use this in allocation-sensitive
+  // contexts. This relies on the assumption that std::mutex won't dynamically
+  // allocate memory, which we assume to be the case on Linux and iOS.
+  static Indestructible<std::array<Bucket, kNumBuckets>> gBuckets;
+  return (*gBuckets)[key % kNumBuckets];
+}
+
+std::atomic<uint64_t> idallocator{0};
+
+} // namespace parking_lot_detail
+} // namespace folly
diff --git a/folly/synchronization/ParkingLot.h b/folly/synchronization/ParkingLot.h
new file mode 100644 (file)
index 0000000..4ee87f7
--- /dev/null
@@ -0,0 +1,278 @@
+/*
+ * Copyright 2017-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <atomic>
+#include <condition_variable>
+#include <mutex>
+
+#include <boost/intrusive/list.hpp>
+
+#include <folly/Hash.h>
+#include <folly/Indestructible.h>
+#include <folly/Optional.h>
+#include <folly/Portability.h>
+#include <folly/Unit.h>
+
+namespace folly {
+
+namespace parking_lot_detail {
+
+struct WaitNodeBase : public boost::intrusive::list_base_hook<> {
+  const uint64_t key_;
+  const uint64_t lotid_;
+
+  // tricky: hold both bucket and node mutex to write, either to read
+  bool signaled_;
+  std::mutex mutex_;
+  std::condition_variable cond_;
+
+  WaitNodeBase(uint64_t key, uint64_t lotid)
+      : key_(key), lotid_(lotid), signaled_(false) {}
+
+  template <typename Clock, typename Duration>
+  std::cv_status wait(std::chrono::time_point<Clock, Duration> deadline) {
+    std::cv_status status = std::cv_status::no_timeout;
+    std::unique_lock<std::mutex> nodeLock(mutex_);
+    while (!signaled_ && status != std::cv_status::timeout) {
+      if (deadline != std::chrono::time_point<Clock, Duration>::max()) {
+        status = cond_.wait_until(nodeLock, deadline);
+      } else {
+        cond_.wait(nodeLock);
+      }
+    }
+    return status;
+  }
+
+  void wake() {
+    std::unique_lock<std::mutex> nodeLock(mutex_);
+    signaled_ = true;
+    cond_.notify_one();
+  }
+
+  bool signaled() {
+    return signaled_;
+  }
+};
+
+extern std::atomic<uint64_t> idallocator;
+
+// Our emulated futex uses 4096 lists of wait nodes.  There are two levels
+// of locking: a per-list mutex that controls access to the list and a
+// per-node mutex, condvar, and bool that are used for the actual wakeups.
+// The per-node mutex allows us to do precise wakeups without thundering
+// herds.
+struct Bucket {
+  std::mutex mutex_;
+  boost::intrusive::list<WaitNodeBase> waiters_;
+
+  static Bucket& bucketFor(uint64_t key);
+};
+
+} // namespace parking_lot_detail
+
+enum class UnparkControl {
+  RetainContinue,
+  RemoveContinue,
+  RetainBreak,
+  RemoveBreak,
+};
+
+enum class ParkResult {
+  Skip,
+  Unpark,
+  Timeout,
+};
+
+/*
+ * ParkingLot provides an interface that is similar to Linux's futex
+ * system call, but with additional functionality.  It is implemented
+ * in a portable way on top of std::mutex and std::condition_variable.
+ *
+ * Additional reading:
+ * https://webkit.org/blog/6161/locking-in-webkit/
+ * https://github.com/WebKit/webkit/blob/master/Source/WTF/wtf/ParkingLot.h
+ * https://locklessinc.com/articles/futex_cheat_sheet/
+ *
+ * The main difference from futex is that park/unpark take lambdas,
+ * such that nearly anything can be done while holding the bucket
+ * lock.  Unpark() lambda can also be used to wake up any number of
+ * waiters.
+ *
+ * ParkingLot is templated on the data type, however, all ParkingLot
+ * implementations are backed by a single static array of buckets to
+ * avoid large memory overhead.  Lambdas will only ever be called on
+ * the specific ParkingLot's nodes.
+ */
+template <typename Data = Unit>
+class ParkingLot {
+  const uint64_t lotid_;
+  ParkingLot(const ParkingLot&) = delete;
+
+  struct WaitNode : public parking_lot_detail::WaitNodeBase {
+    const Data data_;
+
+    template <typename D>
+    WaitNode(uint64_t key, uint64_t lotid, D&& data)
+        : WaitNodeBase(key, lotid), data_(std::forward<Data>(data)) {}
+  };
+
+ public:
+  ParkingLot() : lotid_(parking_lot_detail::idallocator++) {}
+
+  /* Park API
+   *
+   * Key is almost always the address of a variable.
+   *
+   * ToPark runs while holding the bucket lock: usually this
+   * is a check to see if we can sleep, by checking waiter bits.
+   *
+   * PreWait is usually used to implement condition variable like
+   * things, such that you can unlock the condition variable's lock at
+   * the appropriate time.
+   */
+  template <typename Key, typename D, typename ToPark, typename PreWait>
+  ParkResult park(const Key key, D&& data, ToPark&& toPark, PreWait&& preWait) {
+    return park_until(
+        key,
+        std::forward<D>(data),
+        std::forward<ToPark>(toPark),
+        std::forward<PreWait>(preWait),
+        std::chrono::steady_clock::time_point::max());
+  }
+
+  template <
+      typename Key,
+      typename D,
+      typename ToPark,
+      typename PreWait,
+      typename Clock,
+      typename Duration>
+  ParkResult park_until(
+      const Key key,
+      D&& data,
+      ToPark&& toPark,
+      PreWait&& preWait,
+      std::chrono::time_point<Clock, Duration> deadline);
+
+  template <
+      typename Key,
+      typename D,
+      typename ToPark,
+      typename PreWait,
+      typename Rep,
+      typename Period>
+  ParkResult park_for(
+      const Key key,
+      D&& data,
+      ToPark&& toPark,
+      PreWait&& preWait,
+      std::chrono::duration<Rep, Period>& timeout) {
+    return park_until(
+        key,
+        std::forward<D>(data),
+        std::forward<ToPark>(toPark),
+        std::forward<PreWait>(preWait),
+        timeout + std::chrono::steady_clock::now());
+  }
+
+  /*
+   * Unpark API
+   *
+   * Key is the same uniqueaddress used in park(), and is used as a
+   * hash key for lookup of waiters.
+   *
+   * Unparker is a function that is given the Data parameter, and
+   * returns an UnparkControl.  The Remove* results will remove and
+   * wake the waiter, the Ignore/Stop results will not, while stopping
+   * or continuing iteration of the waiter list.
+   */
+  template <typename Key, typename Unparker>
+  void unpark(const Key key, Unparker&& func);
+};
+
+template <typename Data>
+template <
+    typename Key,
+    typename D,
+    typename ToPark,
+    typename PreWait,
+    typename Clock,
+    typename Duration>
+ParkResult ParkingLot<Data>::park_until(
+    const Key bits,
+    D&& data,
+    ToPark&& toPark,
+    PreWait&& preWait,
+    std::chrono::time_point<Clock, Duration> deadline) {
+  auto key = hash::twang_mix64(uint64_t(bits));
+  auto& bucket = parking_lot_detail::Bucket::bucketFor(key);
+  WaitNode node(key, lotid_, std::forward<D>(data));
+
+  {
+    std::unique_lock<std::mutex> bucketLock(bucket.mutex_);
+
+    if (!std::forward<ToPark>(toPark)()) {
+      return ParkResult::Skip;
+    }
+
+    bucket.waiters_.push_back(node);
+  } // bucketLock scope
+
+  std::forward<PreWait>(preWait)();
+
+  auto status = node.wait(deadline);
+
+  if (status == std::cv_status::timeout) {
+    // it's not really a timeout until we unlink the unsignaled node
+    std::unique_lock<std::mutex> bucketLock(bucket.mutex_);
+    if (!node.signaled()) {
+      bucket.waiters_.erase(bucket.waiters_.iterator_to(node));
+      return ParkResult::Timeout;
+    }
+  }
+
+  return ParkResult::Unpark;
+}
+
+template <typename Data>
+template <typename Key, typename Func>
+void ParkingLot<Data>::unpark(const Key bits, Func&& func) {
+  auto key = hash::twang_mix64(uint64_t(bits));
+  auto& bucket = parking_lot_detail::Bucket::bucketFor(key);
+  std::unique_lock<std::mutex> bucketLock(bucket.mutex_);
+
+  for (auto iter = bucket.waiters_.begin(); iter != bucket.waiters_.end();) {
+    auto current = iter;
+    auto& node = *static_cast<WaitNode*>(&*iter++);
+    if (node.key_ == key && node.lotid_ == lotid_) {
+      auto result = std::forward<Func>(func)(node.data_);
+      if (result == UnparkControl::RemoveBreak ||
+          result == UnparkControl::RemoveContinue) {
+        // we unlink, but waiter destroys the node
+        bucket.waiters_.erase(current);
+
+        node.wake();
+      }
+      if (result == UnparkControl::RemoveBreak ||
+          result == UnparkControl::RetainBreak) {
+        return;
+      }
+    }
+  }
+}
+
+} // namespace folly
diff --git a/folly/synchronization/test/ParkingLotTest.cpp b/folly/synchronization/test/ParkingLotTest.cpp
new file mode 100644 (file)
index 0000000..d31a499
--- /dev/null
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2017 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <thread>
+
+#include <folly/synchronization/ParkingLot.h>
+
+#include <folly/portability/GTest.h>
+#include <folly/synchronization/Baton.h>
+
+using namespace folly;
+
+TEST(ParkingLot, multilot) {
+  using SmallLot = ParkingLot<bool>;
+  using LargeLot = ParkingLot<uint64_t>;
+  SmallLot smalllot;
+  LargeLot largelot;
+  folly::Baton<> sb;
+  folly::Baton<> lb;
+
+  std::thread small([&]() {
+    smalllot.park(0, false, [] { return true; }, [&]() { sb.post(); });
+  });
+  std::thread large([&]() {
+    largelot.park(0, true, [] { return true; }, [&]() { lb.post(); });
+  });
+  sb.wait();
+  lb.wait();
+
+  int count = 0;
+  smalllot.unpark(0, [&](bool data) {
+    count++;
+    EXPECT_EQ(data, false);
+    return UnparkControl::RemoveContinue;
+  });
+  EXPECT_EQ(count, 1);
+  count = 0;
+  largelot.unpark(0, [&](bool data) {
+    count++;
+    EXPECT_EQ(data, true);
+    return UnparkControl::RemoveContinue;
+  });
+  EXPECT_EQ(count, 1);
+
+  small.join();
+  large.join();
+}
+
+// This is not possible to implement with Futex, because futex
+// and the native linux syscall are 32-bit only.
+TEST(ParkingLot, LargeWord) {
+  ParkingLot<uint64_t> lot;
+  std::atomic<uint64_t> w{0};
+
+  lot.park(0, false, [&]() { return w == 1; }, []() {});
+
+  // Validate should return false, will hang otherwise.
+}
+
+class WaitableMutex : public std::mutex {
+  using Lot = ParkingLot<std::function<bool(void)>>;
+  static Lot lot;
+
+ public:
+  void unlock() {
+    bool unparked = false;
+    lot.unpark(uint64_t(this), [&](std::function<bool(void)> wfunc) {
+      if (wfunc()) {
+        unparked = true;
+        return UnparkControl::RemoveBreak;
+      } else {
+        return UnparkControl::RemoveContinue;
+      }
+    });
+    if (!unparked) {
+      std::mutex::unlock();
+    }
+    // Otherwise, we pass mutex directly to waiter without needing to unlock.
+  }
+
+  template <typename Wait>
+  void wait(Wait wfunc) {
+    lot.park(
+        uint64_t(this),
+        wfunc,
+        [&]() { return !wfunc(); },
+        [&]() { std::mutex::unlock(); });
+  }
+};
+
+WaitableMutex::Lot WaitableMutex::lot;
+
+TEST(ParkingLot, WaitableMutexTest) {
+  std::atomic<bool> go{false};
+  WaitableMutex mu;
+  std::thread t([&]() {
+    std::lock_guard<WaitableMutex> g(mu);
+    mu.wait([&]() { return go == true; });
+  });
+  sleep(1);
+
+  {
+    std::lock_guard<WaitableMutex> g(mu);
+    go = true;
+  }
+  t.join();
+}