AtomicCoreCachedSharedPtr
authorDave Watson <davejwatson@fb.com>
Wed, 19 Jul 2017 13:24:33 +0000 (06:24 -0700)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Wed, 19 Jul 2017 13:41:36 +0000 (06:41 -0700)
Summary: A folly::atomic_shared_ptr version of CoreCachedSharedPtr.

Reviewed By: yfeldblum

Differential Revision: D5389603

fbshipit-source-id: 942700cd66f5f5219418f4c6112146dc40351aa0

folly/concurrency/CoreCachedSharedPtr.h
folly/concurrency/test/CoreCachedSharedPtrTest.cpp

index ac89ac18ef137a5cc31a22b7c66f68726c3eb4e1..d294e75cf3a478dba5477aff135a105a626e8368 100644 (file)
@@ -20,7 +20,9 @@
 #include <memory>
 
 #include <folly/Enumerate.h>
+#include <folly/concurrency/AtomicSharedPtr.h>
 #include <folly/concurrency/CacheLocality.h>
+#include <folly/experimental/hazptr/hazptr.h>
 
 namespace folly {
 
@@ -82,4 +84,66 @@ class CoreCachedWeakPtr {
   std::array<std::weak_ptr<T>, kNumSlots> slots_;
 };
 
+/**
+ * This class creates core-local caches for a given shared_ptr, to
+ * mitigate contention when acquiring/releasing it.
+ *
+ * All methods are threadsafe.  Hazard pointers are used to avoid
+ * use-after-free for concurrent reset() and get() operations.
+ *
+ * Concurrent reset()s are sequenced with respect to each other: the
+ * sharded shared_ptrs will always all be set to the same value.
+ * get()s will never see a newer pointer on one core, and an older
+ * pointer on another after a subsequent thread migration.
+ */
+template <class T, size_t kNumSlots = 64>
+class AtomicCoreCachedSharedPtr {
+ public:
+  explicit AtomicCoreCachedSharedPtr(const std::shared_ptr<T>& p = nullptr) {
+    reset(p);
+  }
+
+  ~AtomicCoreCachedSharedPtr() {
+    auto slots = slots_.load(std::memory_order_acquire);
+    // Delete of AtomicCoreCachedSharedPtr must be synchronized, no
+    // need for stlots->retire().
+    if (slots) {
+      delete slots;
+    }
+  }
+
+  void reset(const std::shared_ptr<T>& p = nullptr) {
+    auto newslots = folly::make_unique<Slots>();
+    // Allocate each Holder in a different CoreAllocator stripe to
+    // prevent false sharing. Their control blocks will be adjacent
+    // thanks to allocate_shared().
+    for (auto slot : folly::enumerate(newslots->slots_)) {
+      auto alloc = getCoreAllocatorStl<Holder, kNumSlots>(slot.index);
+      auto holder = std::allocate_shared<Holder>(alloc, p);
+      *slot = std::shared_ptr<T>(holder, p.get());
+    }
+
+    auto oldslots = slots_.exchange(newslots.release());
+    if (oldslots) {
+      oldslots->retire();
+    }
+  }
+
+  std::shared_ptr<T> get() const {
+    folly::hazptr::hazptr_holder hazptr;
+    auto slots = hazptr.get_protected(slots_);
+    if (!slots) {
+      return nullptr;
+    }
+    return (slots->slots_)[AccessSpreader<>::current(kNumSlots)];
+  }
+
+ private:
+  using Holder = std::shared_ptr<T>;
+  struct Slots : folly::hazptr::hazptr_obj_base<Slots> {
+    std::array<std::shared_ptr<T>, kNumSlots> slots_;
+  };
+  std::atomic<Slots*> slots_{nullptr};
+};
+
 } // namespace
index 909a63b20334e98450997b49ff478562952597c0..bd29427f3e2c1948bc98d6b70c5591af4e9f4860 100644 (file)
@@ -80,6 +80,13 @@ void benchmarkWeakPtrLock(size_t numThreads, size_t iters) {
   parallelRun([&] { return wp.lock(); }, numThreads, iters);
 }
 
+void benchmarkAtomicSharedPtrCopy(size_t numThreads, size_t iters) {
+  auto s = std::make_shared<int>(1);
+  folly::atomic_shared_ptr<int> p;
+  p.store(s);
+  parallelRun([&] { return p.load(); }, numThreads, iters);
+}
+
 void benchmarkCoreCachedSharedPtrGet(size_t numThreads, size_t iters) {
   folly::CoreCachedSharedPtr<int> p(std::make_shared<int>(1));
   parallelRun([&] { return p.get(); }, numThreads, iters);
@@ -91,6 +98,11 @@ void benchmarkCoreCachedWeakPtrLock(size_t numThreads, size_t iters) {
   parallelRun([&] { return wp.get().lock(); }, numThreads, iters);
 }
 
+void benchmarkAtomicCoreCachedSharedPtrGet(size_t numThreads, size_t iters) {
+  folly::AtomicCoreCachedSharedPtr<int> p(std::make_shared<int>(1));
+  parallelRun([&] { return p.get(); }, numThreads, iters);
+}
+
 } // namespace
 
 BENCHMARK(SharedPtrSingleThread, n) {
@@ -99,12 +111,18 @@ BENCHMARK(SharedPtrSingleThread, n) {
 BENCHMARK(WeakPtrSingleThread, n) {
   benchmarkWeakPtrLock(1, n);
 }
+BENCHMARK(AtomicSharedPtrSingleThread, n) {
+  benchmarkAtomicSharedPtrCopy(1, n);
+}
 BENCHMARK(CoreCachedSharedPtrSingleThread, n) {
   benchmarkCoreCachedSharedPtrGet(1, n);
 }
 BENCHMARK(CoreCachedWeakPtrSingleThread, n) {
   benchmarkCoreCachedWeakPtrLock(1, n);
 }
+BENCHMARK(AtomicCoreCachedSharedPtrSingleThread, n) {
+  benchmarkAtomicCoreCachedSharedPtrGet(1, n);
+}
 
 BENCHMARK_DRAW_LINE();
 
@@ -114,12 +132,18 @@ BENCHMARK(SharedPtr4Threads, n) {
 BENCHMARK(WeakPtr4Threads, n) {
   benchmarkWeakPtrLock(4, n);
 }
+BENCHMARK(AtomicSharedPtr4Threads, n) {
+  benchmarkAtomicSharedPtrCopy(4, n);
+}
 BENCHMARK(CoreCachedSharedPtr4Threads, n) {
   benchmarkCoreCachedSharedPtrGet(4, n);
 }
 BENCHMARK(CoreCachedWeakPtr4Threads, n) {
   benchmarkCoreCachedWeakPtrLock(4, n);
 }
+BENCHMARK(AtomicCoreCachedSharedPtr4Threads, n) {
+  benchmarkAtomicCoreCachedSharedPtrGet(4, n);
+}
 
 BENCHMARK_DRAW_LINE();
 
@@ -129,12 +153,39 @@ BENCHMARK(SharedPtr16Threads, n) {
 BENCHMARK(WeakPtr16Threads, n) {
   benchmarkWeakPtrLock(16, n);
 }
+BENCHMARK(AtomicSharedPtr16Threads, n) {
+  benchmarkAtomicSharedPtrCopy(16, n);
+}
 BENCHMARK(CoreCachedSharedPtr16Threads, n) {
   benchmarkCoreCachedSharedPtrGet(16, n);
 }
 BENCHMARK(CoreCachedWeakPtr16Threads, n) {
   benchmarkCoreCachedWeakPtrLock(16, n);
 }
+BENCHMARK(AtomicCoreCachedSharedPtr16Threads, n) {
+  benchmarkAtomicCoreCachedSharedPtrGet(16, n);
+}
+
+BENCHMARK_DRAW_LINE();
+
+BENCHMARK(SharedPtrSingleThreadReset, n) {
+  auto p = std::make_shared<int>(1);
+  parallelRun([&] { p = std::make_shared<int>(1); }, 1, n);
+}
+BENCHMARK(AtomicSharedPtrSingleThreadReset, n) {
+  auto s = std::make_shared<int>(1);
+  folly::atomic_shared_ptr<int> p;
+  p.store(s);
+  parallelRun([&] { p.store(std::make_shared<int>(1)); }, 1, n);
+}
+BENCHMARK(CoreCachedSharedPtrSingleThreadReset, n) {
+  folly::CoreCachedSharedPtr<int> p(std::make_shared<int>(1));
+  parallelRun([&] { p.reset(std::make_shared<int>(1)); }, 1, n);
+}
+BENCHMARK(AtomicCoreCachedSharedPtrSingleThreadReset, n) {
+  folly::AtomicCoreCachedSharedPtr<int> p(std::make_shared<int>(1));
+  parallelRun([&] { p.reset(std::make_shared<int>(1)); }, 1, n);
+}
 
 int main(int argc, char** argv) {
   testing::InitGoogleTest(&argc, argv);