RCU
authorDave Watson <davejwatson@fb.com>
Wed, 13 Dec 2017 16:13:20 +0000 (08:13 -0800)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Wed, 13 Dec 2017 16:21:09 +0000 (08:21 -0800)
Summary: This adds an RCU implementation, matching http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2017/p0461r1.pdf as closely as pratical.  This implementation does not require thread registration or quiescence.

Reviewed By: magedm

Differential Revision: D6330631

fbshipit-source-id: 2c729f3a4c0f151cde5d9a599ecd2a2c20c7da55

folly/synchronization/Rcu-inl.h [new file with mode: 0644]
folly/synchronization/Rcu.cpp [new file with mode: 0644]
folly/synchronization/Rcu.h [new file with mode: 0644]
folly/synchronization/detail/ThreadCachedInts.h [new file with mode: 0644]
folly/synchronization/detail/ThreadCachedLists.h [new file with mode: 0644]
folly/synchronization/test/RcuTest.cpp [new file with mode: 0644]

diff --git a/folly/synchronization/Rcu-inl.h b/folly/synchronization/Rcu-inl.h
new file mode 100644 (file)
index 0000000..162a6e0
--- /dev/null
@@ -0,0 +1,175 @@
+/*
+ * Copyright 2017-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/Function.h>
+#include <folly/detail/AtFork.h>
+#include <folly/detail/TurnSequencer.h>
+
+namespace folly {
+
+template <typename Tag>
+bool rcu_domain<Tag>::singleton_{false};
+
+template <typename Tag>
+rcu_domain<Tag>::rcu_domain(Executor* executor) noexcept
+    : executor_(executor ? executor : &QueuedImmediateExecutor::instance()) {
+  // Please use a unique tag for each domain.
+  CHECK(!singleton_);
+  singleton_ = true;
+
+  // Register fork handlers.  Holding read locks across fork is not
+  // supported.  Using read locks in other atfork handlers is not
+  // supported.  Other atfork handlers launching new child threads
+  // that use read locks *is* supported.
+  detail::AtFork::registerHandler(
+      this,
+      [this]() { syncMutex_.lock(); },
+      [this]() { syncMutex_.unlock(); },
+      [this]() {
+        counters_.resetAfterFork();
+        syncMutex_.unlock();
+      });
+}
+
+template <typename Tag>
+rcu_domain<Tag>::~rcu_domain() {
+  detail::AtFork::unregisterHandler(this);
+}
+
+template <typename Tag>
+rcu_token rcu_domain<Tag>::lock_shared() {
+  auto idx = version_.load(std::memory_order_acquire);
+  idx &= 1;
+  counters_.increment(idx);
+
+  return idx;
+}
+
+template <typename Tag>
+void rcu_domain<Tag>::unlock_shared(rcu_token&& token) {
+  DCHECK(0 == token.epoch_ || 1 == token.epoch_);
+  counters_.decrement(token.epoch_);
+}
+
+template <typename Tag>
+template <typename T>
+void rcu_domain<Tag>::call(T&& cbin) {
+  auto node = new list_node;
+  node->cb_ = [node, cb = std::forward<T>(cbin)]() {
+    cb();
+    delete node;
+  };
+  retire(node);
+}
+
+template <typename Tag>
+void rcu_domain<Tag>::retire(list_node* node) noexcept {
+  q_.push(node);
+
+  // Note that it's likely we hold a read lock here,
+  // so we can only half_sync(false).  half_sync(true)
+  // or a synchronize() call might block forever.
+  uint64_t time = std::chrono::duration_cast<std::chrono::milliseconds>(
+                      std::chrono::steady_clock::now().time_since_epoch())
+                      .count();
+  if (time > syncTime_.load(std::memory_order_relaxed) + syncTimePeriod_) {
+    list_head finished;
+    {
+      std::lock_guard<std::mutex> g(syncMutex_);
+      syncTime_.store(time, std::memory_order_relaxed);
+      half_sync(false, finished);
+    }
+    // callbacks are called outside of syncMutex_
+    finished.forEach(
+        [&](list_node* item) { executor_->add(std::move(item->cb_)); });
+  }
+}
+
+template <typename Tag>
+void rcu_domain<Tag>::synchronize() noexcept {
+  auto curr = version_.load(std::memory_order_acquire);
+  // Target is two epochs away.
+  auto target = curr + 2;
+  while (true) {
+    // Try to assign ourselves to do the sync work.
+    // If someone else is already assigned, we can wait for
+    // the work to be finished by waiting on turn_.
+    auto work = work_.load(std::memory_order_acquire);
+    auto tmp = work;
+    if (work < target && work_.compare_exchange_strong(tmp, target)) {
+      list_head finished;
+      {
+        std::lock_guard<std::mutex> g(syncMutex_);
+        while (version_.load(std::memory_order_acquire) < target) {
+          half_sync(true, finished);
+        }
+      }
+      // callbacks are called outside of syncMutex_
+      finished.forEach(
+          [&](list_node* node) { executor_->add(std::move(node->cb_)); });
+      return;
+    } else {
+      if (version_.load(std::memory_order_acquire) >= target) {
+        return;
+      }
+      std::atomic<uint32_t> cutoff{100};
+      // Wait for someone to finish the work.
+      turn_.tryWaitForTurn(work, cutoff, false);
+    }
+  }
+}
+
+/*
+ * Not multithread safe, but it could be with proper version
+ * checking and stronger increment of version.  See
+ * https://github.com/pramalhe/ConcurrencyFreaks/blob/master/papers/gracesharingurcu-2016.pdf
+ *
+ * This version, however, can go to sleep if there are outstanding
+ * readers, and does not spin or need rescheduling, unless blocking = false.
+ */
+template <typename Tag>
+void rcu_domain<Tag>::half_sync(bool blocking, list_head& finished) {
+  uint64_t curr = version_.load(std::memory_order_acquire);
+  auto next = curr + 1;
+
+  // Push all work to a queue for moving through two epochs.  One
+  // version is not enough because of late readers of the version_
+  // counter in lock_shared.
+  //
+  // Note that for a similar reason we can't swap out the q here,
+  // and instead drain it, so concurrent calls to call() are safe,
+  // and will wait for the next epoch.
+  q_.collect(queues_[0]);
+
+  if (blocking) {
+    counters_.waitForZero(next & 1);
+  } else {
+    if (counters_.readFull(next & 1) != 0) {
+      return;
+    }
+  }
+
+  // Run callbacks that have been through two epochs, and swap queues
+  // for those only through a single epoch.
+  finished.splice(queues_[1]);
+  queues_[1].splice(queues_[0]);
+
+  version_.store(next, std::memory_order_release);
+  // Notify synchronous waiters in synchronize().
+  turn_.completeTurn(curr);
+}
+
+} // namespace folly
diff --git a/folly/synchronization/Rcu.cpp b/folly/synchronization/Rcu.cpp
new file mode 100644 (file)
index 0000000..c74c1c2
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2017-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/synchronization/Rcu.h>
+
+namespace folly {
+
+FOLLY_STATIC_CTOR_PRIORITY_MAX rcu_domain<RcuTag> rcu_default_domain_;
+
+} // namespace folly
diff --git a/folly/synchronization/Rcu.h b/folly/synchronization/Rcu.h
new file mode 100644 (file)
index 0000000..670ec78
--- /dev/null
@@ -0,0 +1,462 @@
+/*
+ * Copyright 2017-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <atomic>
+#include <functional>
+#include <limits>
+
+#include <folly/Optional.h>
+#include <folly/detail/TurnSequencer.h>
+#include <folly/executors/QueuedImmediateExecutor.h>
+#include <folly/synchronization/detail/ThreadCachedInts.h>
+#include <folly/synchronization/detail/ThreadCachedLists.h>
+
+// Implementation of proposed RCU C++ API
+// http://open-std.org/JTC1/SC22/WG21/docs/papers/2017/p0566r3.pdf
+
+// Overview
+
+// This file provides a low-overhead mechanism to guarantee ordering
+// between operations on shared data. In the simplest usage pattern,
+// readers enter a critical section, view some state, and leave the
+// critical section, while writers modify shared state and then defer
+// some cleanup operations. Proper use of these classes will guarantee
+// that a cleanup operation that is deferred during a reader critical
+// section will not be executed until after that critical section is
+// over.
+
+// Example
+
+// As an example, suppose we have some configuration data that gets
+// periodically updated. We need to protect ourselves on *every* read
+// path, even if updates are only vanishly rare, because we don't know
+// if some writer will come along and yank the data out from under us.
+//
+// Here's how that might look without deferral:
+//
+// void doSomething(IPAddress host);
+//
+// folly::SharedMutex sm;
+// ConfigData* globalConfigData;
+//
+// void reader() {
+//   while (true) {
+//     sm.lock_shared();
+//     IPAddress curManagementServer = globalConfigData->managementServerIP;
+//     sm.unlock_shared();
+//     doSomethingWith(curManagementServer);
+//   }
+// }
+//
+// void writer() {
+//   while (true) {
+//     std::this_thread::sleep_for(std::chrono::seconds(60));
+//     ConfigData* oldConfigData = globalConfigData;
+//     ConfigData* newConfigData = loadConfigDataFromRemoteServer();
+//     sm.lock();
+//     globalConfigData = newConfigData;
+//     sm.unlock();
+//     delete oldConfigData;
+//   }
+// }
+//
+// The readers have to lock_shared and unlock_shared every iteration, even
+// during the overwhelming majority of the time in which there's no writer
+// present. These functions are surprisingly expensive; there's around 30ns of
+// overhead per critical section on my machine.
+//
+// Let's get rid of the locking. The readers and writer may proceed
+// concurrently. Here's how this would look:
+//
+// void doSomething(IPAddress host);
+//
+// std::atomic<ConfigData*> globalConfigData;
+//
+// void reader() {
+//   while (true) {
+//     ConfigData* configData = globalConfigData.load();
+//     IPAddress curManagementServer = configData->managementServerIP;
+//     doSomethingWith(curManagementServer);
+//  }
+// }
+//
+// void writer() {
+//   while (true) {
+//     std::this_thread::sleep_for(std::chrono::seconds(60));
+//     ConfigData* newConfigData = loadConfigDataFromRemoteServer();
+//     globalConfigData.store(newConfigData);
+//     // We can't delete the old ConfigData; we don't know when the readers
+//     // will be done with it! I guess we have to leak it...
+//   }
+// }
+//
+// This works and is fast, but we don't ever reclaim the memory we
+// allocated for the copy of the data. We need a way for the writer to
+// know that no readers observed the old value of the pointer and are
+// still using it. Tweaking this slightly, we want a way for the
+// writer to say "I want some operation (deleting the old ConfigData)
+// to happen, but only after I know that all readers that started
+// before I requested the action have finished.". The classes in this
+// namespace allow this. Here's how we'd express this idea:
+//
+// void doSomething(IPAddress host);
+// std::atomic<ConfigData*> globalConfigData;
+//
+//
+// void reader() {
+//   while (true) {
+//     IPAddress curManagementServer;
+//     {
+//       // We're about to do some reads we want to protect; if we read a
+//       // pointer, we need to make sure that if the writer comes along and
+//       // updates it, the writer's cleanup operation won't happen until we're
+//       // done accessing the pointed-to data. We get a Guard on that
+//       // domain; as long as it exists, no function subsequently passed to
+//       // invokeEventually will execute.
+//       rcu_reader guard;
+//       ConfigData* configData = globalConfigData.load();
+//       // We created a guard before we read globalConfigData; we know that the
+//       // pointer will remain valid until the guard is destroyed.
+//       curManagementServer = configData->managementServerIP;
+//       // Guard is released here; retired objects may be freed.
+//     }
+//     doSomethingWith(curManagementServer);
+//   }
+// }
+//
+// void writer() {
+//
+//   while (true) {
+//     std::this_thread::sleep_for(std::chrono::seconds(60));
+//     ConfigData* oldConfigData = globalConfigData.load();
+//     ConfigData* newConfigData = loadConfigDataFromRemoteServer();
+//     globalConfigData.store(newConfigData);
+//     rcu_retire(oldConfigData);
+//     // alternatively, in a blocking manner:
+//     //   synchronize_rcu();
+//     //   delete oldConfigData;
+//   }
+// }
+//
+// This gets us close to the speed of the second solution, without
+// leaking memory. A rcu_reader costs about 4 ns, faster than the
+// lock_shared() / unlock_shared() pair in the more traditional
+// mutex-based approach from our first attempt, and the writer
+// never blocks the readers.
+
+// Notes
+
+// This implementation does implement an rcu_domain, and provides a default
+// one for use per the standard implementation.
+//
+// rcu_domain:
+//   A "universe" of deferred execution. Each rcu_domain has an
+//   executor on which deferred functions may execute. Readers obtain
+//   Tokens from an rcu_domain and release them back to it.
+//   rcu_domains should in general be completely separated; it's never
+//   correct to pass a token from one domain to another, and
+//   rcu_reader objects created on one domain do not prevent functions
+//   deferred on other domains from running. It's intended that most
+//   callers should only ever use the default, global domain.
+//
+//   Creation of a domain takes a template tag argument, which
+//   defaults to void. To access different domains, you have to pass a
+//   different tag.  The global domain is preferred for almost all
+//   purposes, unless a different executor is required.
+//
+//   You should use a custom rcu_domain if you can't avoid sleeping
+//   during reader critical sections (because you don't want to block
+//   all other users of the domain while you sleep), or you want to change
+//   the default executor type.
+
+// API correctness limitations:
+//  - Exceptions:
+//    In short, nothing about this is exception safe. retire functions should
+//    not throw exceptions in their destructors, move constructors or call
+//    operators.
+//
+// Performance limitations:
+//  - Blocking:
+//    A blocked reader will block invocation of deferred functions until it
+//    becomes unblocked. Sleeping while holding a rcu_reader can have bad
+//    performance consequences.
+//
+// API questions you might have:
+//  - Nested critical sections:
+//    These are fine. The following is explicitly allowed by the standard, up to
+//    a nesting level of 100:
+//        rcu_reader reader1;
+//        doSomeWork();
+//        rcu_reader reader2;
+//        doSomeMoreWork();
+//  - Restrictions on retired()ed functions:
+//    Any operation is safe from within a retired function's
+//    execution; you can retire additional functions or add a new domain call to
+//    the domain.
+//  - rcu_domain destruction:
+//    Destruction of a domain assumes previous synchronization: all remaining
+//    call and retire calls are immediately added to the executor.
+
+// Overheads
+
+// Deferral latency is as low as is practical: overhead involves running
+// several global memory barriers on the machine to ensure all readers are gone.
+//
+// Currently use of MPMCQueue is the bottleneck for deferred calls, more
+// specialized queues could be used if available, since only a single reader
+// reads the queue, and can splice all of the items to the executor if possible.
+//
+// synchronize_rcu() call latency is on the order of 10ms.  Multiple
+// separate threads can share a synchronized period and should scale.
+//
+// rcu_retire() is a queue push, and on the order of 150 ns, however,
+// the current implementation may synchronize if the retire queue is full,
+// resulting in tail latencies of ~10ms.
+//
+// rcu_reader creation/destruction is ~4ns.  By comparison,
+// folly::SharedMutex::lock_shared + unlock_shared pair is ~26ns
+
+// Hazard pointers vs. RCU:
+//
+// Hazard pointers protect pointers, generally malloc()d pieces of memory, and
+// each hazptr only protects one such block.
+//
+// RCU protects critical sections, *all* memory is protected as long
+// as the critical section is active.
+//
+// For example, this has implications for linked lists: If you wish to
+// free an entire linked list, you simply rcu_retire() each node with
+// RCU: readers will see either an entirely valid list, or no list at
+// all.
+//
+// Conversely with hazptrs, generally lists are walked with
+// hand-over-hand hazptrs.  Only one node is protected at a time.  So
+// anywhere in the middle of the list, hazptrs may read NULL, and throw
+// away all current work.  Alternatively, reference counting can be used,
+// (as if each node was a shared_ptr<node>), so that readers will always see
+// *the remaining part* of the list as valid, however parts before its current
+// hazptr may be freed.
+//
+// So roughly: RCU is simple, but an all-or-nothing affair.  A single rcu_reader
+// can block all reclamation. Hazptrs will reclaim exactly as much as possible,
+// at the cost of extra work writing traversal code
+//
+// Reproduced from
+// http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2017/p0461r1.pdf
+//
+//                              Reference Counting    RCU            Hazptr
+//
+// Unreclaimed objects          Bounded               Unbounded      Bounded
+//
+// Contention among readers     High                  None           None
+//
+// Traversal forward progress   lock-free             wait-free      lock-free
+//
+// Reclamation forward progress lock-free             blocking       wait-free
+//
+// Traversal speed              atomic                no-overhead    folly's is
+//                                                                   no-overhead
+//
+// Reference acquisition        unconditional         unconditional  conditional
+//
+// Automatic reclamation        yes                   no             no
+//
+// Purpose of domains           N/A                   isolate slow configeration
+//                                                    readers
+
+namespace folly {
+
+struct RcuTag;
+
+template <typename Tag = RcuTag>
+class rcu_domain;
+
+// Opaque token used to match up lock_shared() and unlock_shared()
+// pairs.
+class rcu_token {
+ public:
+  rcu_token(uint64_t epoch) : epoch_(epoch) {}
+  rcu_token() {}
+  ~rcu_token() = default;
+
+  rcu_token(const rcu_token&) = delete;
+  rcu_token(rcu_token&& other) = default;
+  rcu_token& operator=(const rcu_token& other) = delete;
+  rcu_token& operator=(rcu_token&& other) = default;
+
+ private:
+  template <typename Tag>
+  friend class rcu_domain;
+  uint64_t epoch_;
+};
+
+// For most usages, rcu_domain is unnecessary, and you can use
+// rcu_reader and rcu_retire/synchronize_rcu directly.
+template <typename Tag>
+class rcu_domain {
+  using list_head = typename detail::ThreadCachedLists<Tag>::ListHead;
+  using list_node = typename detail::ThreadCachedLists<Tag>::Node;
+
+ public:
+  /*
+   * If an executor is passed, it is used to run calls and delete
+   * retired objects.
+   */
+  rcu_domain(Executor* executor = nullptr) noexcept;
+
+  rcu_domain(const rcu_domain&) = delete;
+  rcu_domain(rcu_domain&&) = delete;
+  rcu_domain& operator=(const rcu_domain&) = delete;
+  rcu_domain& operator=(rcu_domain&&) = delete;
+  ~rcu_domain();
+
+  // Reader locks: Prevent any calls from occuring, retired memory
+  // from being freed, and synchronize() calls from completing until
+  // all preceding lock_shared() sections are finished.
+
+  // Note: can potentially allocate on thread first use.
+  FOLLY_ALWAYS_INLINE rcu_token lock_shared();
+  FOLLY_ALWAYS_INLINE void unlock_shared(rcu_token&&);
+
+  // Call a function after concurrent critical sections have finished.
+  // Does not block unless the queue is full, then may block to wait
+  // for scheduler thread, but generally does not wait for full
+  // synchronization.
+  template <typename T>
+  void call(T&& cbin);
+  void retire(list_node* node) noexcept;
+
+  // Ensure concurrent critical sections have finished.
+  // Always waits for full synchronization.
+  // read lock *must not* be held.
+  void synchronize() noexcept;
+
+ private:
+  detail::ThreadCachedInts<Tag> counters_;
+  // Global epoch.
+  std::atomic<uint64_t> version_{0};
+  // Future epochs being driven by threads in synchronize
+  std::atomic<uint64_t> work_{0};
+  // Matches version_, for waking up threads in synchronize that are
+  // sharing an epoch.
+  detail::TurnSequencer<std::atomic> turn_;
+  // Only one thread can drive half_sync.
+  std::mutex syncMutex_;
+  // Currently half_sync takes ~16ms due to heavy barriers.
+  // Ensure that if used in a single thread, max GC time
+  // is limited to 1% of total CPU time.
+  static constexpr uint64_t syncTimePeriod_{1600 * 2 /* full sync is 2x */};
+  std::atomic<uint64_t> syncTime_{0};
+  // call()s waiting to move through two epochs.
+  detail::ThreadCachedLists<Tag> q_;
+  // Executor callbacks will eventually be run on.
+  Executor* executor_{nullptr};
+  static bool singleton_; // Ensure uniqueness per-tag.
+
+  // Queues for callbacks waiting to go through two epochs.
+  list_head queues_[2]{};
+
+  // Move queues through one epoch (half of a full synchronize()).
+  // Will block waiting for readers to exit if blocking is true.
+  // blocking must *not* be true if the current thread is locked,
+  // or will deadlock.
+  //
+  // returns a list of callbacks ready to run in cbs.
+  void half_sync(bool blocking, list_head& cbs);
+};
+
+extern rcu_domain<RcuTag> rcu_default_domain_;
+
+inline rcu_domain<RcuTag>* rcu_default_domain() {
+  return &rcu_default_domain_;
+}
+
+// Main reader guard class.
+class rcu_reader {
+ public:
+  FOLLY_ALWAYS_INLINE rcu_reader() noexcept
+      : epoch_(rcu_default_domain()->lock_shared()) {}
+  rcu_reader(std::defer_lock_t) noexcept {}
+  rcu_reader(const rcu_reader&) = delete;
+  rcu_reader(rcu_reader&& other) noexcept : epoch_(std::move(other.epoch_)) {}
+  rcu_reader& operator=(const rcu_reader&) = delete;
+  rcu_reader& operator=(rcu_reader&& other) noexcept {
+    if (epoch_.has_value()) {
+      rcu_default_domain()->unlock_shared(std::move(epoch_.value()));
+    }
+    epoch_ = std::move(other.epoch_);
+    return *this;
+  }
+
+  FOLLY_ALWAYS_INLINE ~rcu_reader() noexcept {
+    if (epoch_.has_value()) {
+      rcu_default_domain()->unlock_shared(std::move(epoch_.value()));
+    }
+  }
+
+  void swap(rcu_reader& other) noexcept {
+    std::swap(epoch_, other.epoch_);
+  }
+
+  FOLLY_ALWAYS_INLINE void lock() noexcept {
+    DCHECK(!epoch_.has_value());
+    epoch_ = rcu_default_domain()->lock_shared();
+  }
+
+  FOLLY_ALWAYS_INLINE void unlock() noexcept {
+    DCHECK(epoch_.has_value());
+    rcu_default_domain()->unlock_shared(std::move(epoch_.value()));
+  }
+
+ private:
+  Optional<rcu_token> epoch_;
+};
+
+inline void swap(rcu_reader& a, rcu_reader& b) noexcept {
+  a.swap(b);
+}
+
+inline void synchronize_rcu() noexcept {
+  rcu_default_domain()->synchronize();
+}
+
+inline void rcu_barrier() noexcept {
+  rcu_default_domain()->synchronize();
+}
+
+// Free-function retire.  Always allocates.
+template <typename T, typename D = std::default_delete<T>>
+void rcu_retire(T* p, D d = {}) {
+  rcu_default_domain()->call([p, del = std::move(d)]() { del(p); });
+}
+
+// Base class for rcu objects.  retire() will use preallocated storage
+// from rcu_obj_base, vs.  rcu_retire() which always allocates.
+template <typename T, typename D = std::default_delete<T>>
+class rcu_obj_base : detail::ThreadCachedListsBase::Node {
+ public:
+  void retire(D d = {}) {
+    // This implementation assumes folly::Function has enough
+    // inline storage for D, otherwise, it allocates.
+    this->cb_ = [this, d = std::move(d)]() { d(static_cast<T*>(this)); };
+    rcu_default_domain()->retire(this);
+  }
+};
+
+} // namespace folly
+
+#include <folly/synchronization/Rcu-inl.h>
diff --git a/folly/synchronization/detail/ThreadCachedInts.h b/folly/synchronization/detail/ThreadCachedInts.h
new file mode 100644 (file)
index 0000000..2b954c1
--- /dev/null
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2017-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/Function.h>
+#include <folly/ThreadLocal.h>
+#include <folly/synchronization/AsymmetricMemoryBarrier.h>
+
+// This is unlike folly::ThreadCachedInt in that the full value
+// is never rounded up globally and cached, it only supports readFull.
+//
+// folly/experimental/TLRefCount is similar, but does not support a
+// waitForZero, and is not reset-able.
+//
+// Note that the RCU implementation is completely abstracted from the
+// counter implementation, a rseq implementation can be dropped in
+// if the kernel supports it.
+
+namespace folly {
+
+namespace detail {
+
+template <typename Tag>
+class ThreadCachedInts {
+  // These are only accessed under the ThreadLocal lock.
+  int64_t orphan_inc_[2]{0, 0};
+  int64_t orphan_dec_[2]{0, 0};
+  folly::detail::Futex<> waiting_;
+
+  class Integer {
+   public:
+    ThreadCachedInts* ints_;
+    constexpr Integer(ThreadCachedInts* ints) noexcept
+        : ints_(ints), inc_{}, dec_{} {}
+    std::atomic<int64_t> inc_[2];
+    std::atomic<int64_t> dec_[2];
+    ~Integer() noexcept {
+      ints_->orphan_inc_[0] += inc_[0].load(std::memory_order_relaxed);
+      ints_->orphan_inc_[1] += inc_[1].load(std::memory_order_relaxed);
+      ints_->orphan_dec_[0] += dec_[0].load(std::memory_order_relaxed);
+      ints_->orphan_dec_[1] += dec_[1].load(std::memory_order_relaxed);
+      ints_->waiting_.store(0, std::memory_order_release);
+      ints_->waiting_.futexWake();
+    }
+  };
+  folly::ThreadLocalPtr<Integer, Tag> cs_;
+
+  // Cache the int pointer in a threadlocal.
+  static thread_local Integer* int_cache_;
+
+  void init() {
+    auto ret = new Integer(this);
+    cs_.reset(ret);
+    int_cache_ = ret;
+  }
+
+ public:
+  FOLLY_ALWAYS_INLINE void increment(uint8_t epoch) {
+    if (!int_cache_) {
+      init();
+    }
+
+    auto& c = int_cache_->inc_[epoch];
+    auto val = c.load(std::memory_order_relaxed);
+    c.store(val + 1, std::memory_order_relaxed);
+
+    folly::asymmetricLightBarrier(); // A
+  }
+
+  FOLLY_ALWAYS_INLINE void decrement(uint8_t epoch) {
+    folly::asymmetricLightBarrier(); // B
+    if (!int_cache_) {
+      init();
+    }
+
+    auto& c = int_cache_->dec_[epoch];
+    auto val = c.load(std::memory_order_relaxed);
+    c.store(val + 1, std::memory_order_relaxed);
+
+    folly::asymmetricLightBarrier(); // C
+    if (waiting_.load(std::memory_order_acquire)) {
+      waiting_.store(0, std::memory_order_release);
+      waiting_.futexWake();
+    }
+  }
+
+  int64_t readFull(uint8_t epoch) {
+    int64_t full = 0;
+
+    // Matches A - ensure all threads have seen new value of version,
+    // *and* that we see current values of counters in readFull()
+    //
+    // Note that in lock_shared if a reader is currently between the
+    // version load and counter increment, they may update the wrong
+    // epoch.  However, this is ok - they started concurrently *after*
+    // any callbacks that will run, and therefore it is safe to run
+    // the callbacks.
+    folly::asymmetricHeavyBarrier();
+    for (auto& i : cs_.accessAllThreads()) {
+      full -= i.dec_[epoch].load(std::memory_order_relaxed);
+    }
+
+    // Matches B - ensure that all increments are seen if decrements
+    // are seen. This is necessary because increment and decrement
+    // are allowed to happen on different threads.
+    folly::asymmetricHeavyBarrier();
+
+    auto accessor = cs_.accessAllThreads();
+    for (auto& i : accessor) {
+      full += i.inc_[epoch].load(std::memory_order_relaxed);
+    }
+
+    // orphan is read behind accessAllThreads lock
+    auto res = full + orphan_inc_[epoch] - orphan_dec_[epoch];
+    return res;
+  }
+
+  void waitForZero(uint8_t phase) {
+    // Try reading before futex sleeping.
+    if (readFull(phase) == 0) {
+      return;
+    }
+
+    while (true) {
+      waiting_.store(1, std::memory_order_release);
+      // Matches C.  Ensure either decrement sees waiting_,
+      // or we see their decrement and can safely sleep.
+      folly::asymmetricHeavyBarrier();
+      if (readFull(phase) == 0) {
+        break;
+      }
+      waiting_.futexWait(1);
+    }
+    waiting_.store(0, std::memory_order_relaxed);
+  }
+
+  // We are guaranteed to be called while StaticMeta lock is still
+  // held because of ordering in AtForkList.  We can therefore safely
+  // touch orphan_ and clear out all counts.
+  void resetAfterFork() {
+    if (int_cache_) {
+      int_cache_->dec_[0].store(0, std::memory_order_relaxed);
+      int_cache_->dec_[1].store(0, std::memory_order_relaxed);
+      int_cache_->inc_[0].store(0, std::memory_order_relaxed);
+      int_cache_->inc_[1].store(0, std::memory_order_relaxed);
+    }
+    orphan_inc_[0] = 0;
+    orphan_inc_[1] = 0;
+    orphan_dec_[0] = 0;
+    orphan_dec_[1] = 0;
+  }
+};
+
+template <typename Tag>
+thread_local typename detail::ThreadCachedInts<Tag>::Integer*
+    detail::ThreadCachedInts<Tag>::int_cache_{nullptr};
+
+} // namespace detail
+} // namespace folly
diff --git a/folly/synchronization/detail/ThreadCachedLists.h b/folly/synchronization/detail/ThreadCachedLists.h
new file mode 100644 (file)
index 0000000..6161d4e
--- /dev/null
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2017-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <atomic>
+
+#include <folly/Function.h>
+#include <folly/ThreadLocal.h>
+#include <glog/logging.h>
+
+namespace folly {
+
+namespace detail {
+
+// This is a thread-local cached, multi-producer single-consumer
+// queue, similar to a concurrent version of std::list.
+//
+class ThreadCachedListsBase {
+ public:
+  struct Node {
+    folly::Function<void()> cb_;
+    Node* next_{nullptr};
+  };
+};
+
+template <typename Tag>
+class ThreadCachedLists : public ThreadCachedListsBase {
+ public:
+  struct AtomicListHead {
+    std::atomic<Node*> tail_{nullptr};
+    std::atomic<Node*> head_{nullptr};
+  };
+
+  // Non-concurrent list, similar to std::list.
+  struct ListHead {
+    Node* head_{nullptr};
+    Node* tail_{nullptr};
+
+    // Run func on each list node.
+    template <typename Func>
+    void forEach(Func func) {
+      auto node = tail_;
+      while (node != nullptr) {
+        auto next = node->next_;
+        func(node);
+        node = next;
+      }
+    }
+
+    // Splice other in to this list.
+    // Afterwards, other is a valid empty listhead.
+    void splice(ListHead& other);
+
+    void splice(AtomicListHead& other);
+  };
+
+  // Push a node on a thread-local list.  Returns true if local list
+  // was pushed global.
+  void push(Node* node);
+
+  // Collect all thread local lists to a single local list.
+  // This function is threadsafe with concurrent push()es,
+  // but only a single thread may call collect() at a time.
+  void collect(ListHead& list);
+
+ private:
+  // Push list to the global list.
+  void pushGlobal(ListHead& list);
+
+  ListHead ghead_;
+
+  struct TLHead : public AtomicListHead {
+    ThreadCachedLists* parent_;
+
+   public:
+    TLHead(ThreadCachedLists* parent) : parent_(parent) {}
+
+    ~TLHead() {
+      parent_->ghead_.splice(*this);
+    }
+  };
+
+  folly::ThreadLocalPtr<TLHead, Tag> lhead_;
+};
+
+// push() and splice() are optimistic w.r.t setting the list head: The
+// first pusher cas's the list head, which functions as a lock until
+// tail != null.  The first pusher then sets tail_ = head_.
+//
+// splice() does the opposite: steals the tail_ via exchange, then
+// unlocks the list again by setting head_ to null.
+template <typename Tag>
+void ThreadCachedLists<Tag>::push(Node* node) {
+  DCHECK(node->next_ == nullptr);
+  static thread_local TLHead* cache_{nullptr};
+
+  if (!cache_) {
+    auto l = lhead_.get();
+    if (!l) {
+      lhead_.reset(new TLHead(this));
+      l = lhead_.get();
+      DCHECK(l);
+    }
+    cache_ = l;
+  }
+
+  while (true) {
+    auto head = cache_->head_.load(std::memory_order_relaxed);
+    if (!head) {
+      node->next_ = nullptr;
+      if (cache_->head_.compare_exchange_weak(head, node)) {
+        cache_->tail_.store(node);
+        break;
+      }
+    } else {
+      auto tail = cache_->tail_.load(std::memory_order_relaxed);
+      if (tail) {
+        node->next_ = tail;
+        if (cache_->tail_.compare_exchange_weak(node->next_, node)) {
+          break;
+        }
+      }
+    }
+  }
+}
+
+template <typename Tag>
+void ThreadCachedLists<Tag>::collect(ListHead& list) {
+  auto acc = lhead_.accessAllThreads();
+
+  for (auto& thr : acc) {
+    list.splice(thr);
+  }
+
+  list.splice(ghead_);
+}
+
+template <typename Tag>
+void ThreadCachedLists<Tag>::ListHead::splice(ListHead& other) {
+  if (other.head_ != nullptr) {
+    DCHECK(other.tail_ != nullptr);
+  } else {
+    DCHECK(other.tail_ == nullptr);
+    return;
+  }
+
+  if (head_) {
+    DCHECK(tail_ != nullptr);
+    DCHECK(head_->next_ == nullptr);
+    head_->next_ = other.tail_;
+    head_ = other.head_;
+  } else {
+    DCHECK(head_ == nullptr);
+    head_ = other.head_;
+    tail_ = other.tail_;
+  }
+
+  other.head_ = nullptr;
+  other.tail_ = nullptr;
+}
+
+template <typename Tag>
+void ThreadCachedLists<Tag>::ListHead::splice(AtomicListHead& list) {
+  ListHead local;
+
+  auto tail = list.tail_.load();
+  if (tail) {
+    local.tail_ = list.tail_.exchange(nullptr);
+    local.head_ = list.head_.exchange(nullptr);
+    splice(local);
+  }
+}
+
+} // namespace detail
+} // namespace folly
diff --git a/folly/synchronization/test/RcuTest.cpp b/folly/synchronization/test/RcuTest.cpp
new file mode 100644 (file)
index 0000000..5df5b0f
--- /dev/null
@@ -0,0 +1,276 @@
+/*
+ * Copyright 2017-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/synchronization/Rcu.h>
+
+#include <thread>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include <folly/Benchmark.h>
+#include <folly/Random.h>
+#include <folly/portability/GFlags.h>
+#include <folly/portability/GTest.h>
+#include <folly/synchronization/Baton.h>
+
+using namespace folly;
+
+DEFINE_int64(iters, 100000, "Number of iterations");
+DEFINE_int64(threads, 32, "Number of threads");
+
+TEST(RcuTest, Basic) {
+  auto foo = new int(2);
+  rcu_retire(foo);
+}
+
+class des {
+  bool* d_;
+
+ public:
+  des(bool* d) : d_(d) {}
+  ~des() {
+    *d_ = true;
+  }
+};
+
+TEST(RcuTest, Guard) {
+  bool del = false;
+  auto foo = new des(&del);
+  { rcu_reader g; }
+  rcu_retire(foo);
+  synchronize_rcu();
+  EXPECT_TRUE(del);
+}
+
+TEST(RcuTest, Perf) {
+  long i = FLAGS_iters;
+  auto start = std::chrono::steady_clock::now();
+  while (i-- > 0) {
+    rcu_reader g;
+  }
+  auto diff = std::chrono::steady_clock::now() - start;
+  printf(
+      "Total time %li ns \n",
+      std::chrono::duration_cast<std::chrono::nanoseconds>(diff).count() /
+          FLAGS_iters);
+}
+
+TEST(RcuTest, ResetPerf) {
+  long i = FLAGS_iters;
+  auto start = std::chrono::steady_clock::now();
+  while (i-- > 0) {
+    rcu_retire<int>(nullptr, [](int*) {});
+  }
+  auto diff = std::chrono::steady_clock::now() - start;
+  printf(
+      "Total time %li ns \n",
+      std::chrono::duration_cast<std::chrono::nanoseconds>(diff).count() /
+          FLAGS_iters);
+}
+
+TEST(RcuTest, SlowReader) {
+  std::thread t;
+  {
+    rcu_reader g;
+
+    t = std::thread([&]() { synchronize_rcu(); });
+    usleep(100); // Wait for synchronize to start
+  }
+  t.join();
+}
+
+rcu_reader tryretire(des* obj) {
+  rcu_reader g;
+  rcu_retire(obj);
+  return g;
+}
+
+TEST(RcuTest, CopyGuard) {
+  bool del = false;
+  auto foo = new des(&del);
+  {
+    auto res = tryretire(foo);
+    EXPECT_FALSE(del);
+  }
+  rcu_barrier();
+  EXPECT_TRUE(del);
+}
+
+TEST(RcuTest, Stress) {
+  std::vector<std::thread> threads;
+  constexpr uint32_t sz = 1000;
+  std::atomic<int*> ints[sz];
+  for (uint i = 0; i < sz; i++) {
+    ints[i].store(new int(0));
+  }
+  for (int th = 0; th < FLAGS_threads; th++) {
+    threads.push_back(std::thread([&]() {
+      for (int i = 0; i < FLAGS_iters / 100; i++) {
+        rcu_reader g;
+        int sum = 0;
+        int* ptrs[sz];
+        for (uint j = 0; j < sz; j++) {
+          ptrs[j] = ints[j].load(std::memory_order_acquire);
+        }
+        for (uint j = 0; j < sz; j++) {
+          sum += *ptrs[j];
+        }
+        EXPECT_EQ(sum, 0);
+      }
+    }));
+  }
+  std::atomic<bool> done{false};
+  std::thread updater([&]() {
+    while (!done.load()) {
+      auto newint = new int(0);
+      auto oldint = ints[folly::Random::rand32() % sz].exchange(newint);
+      rcu_retire<int>(oldint, [](int* obj) {
+        *obj = folly::Random::rand32();
+        delete obj;
+      });
+    }
+  });
+  for (auto& t : threads) {
+    t.join();
+  }
+  done = true;
+  updater.join();
+}
+
+TEST(RcuTest, Synchronize) {
+  std::vector<std::thread> threads;
+  for (int th = 0; th < FLAGS_threads; th++) {
+    threads.push_back(std::thread([&]() {
+      for (int i = 0; i < 10; i++) {
+        synchronize_rcu();
+      }
+    }));
+  }
+  for (auto& t : threads) {
+    t.join();
+  }
+}
+
+TEST(RcuTest, NewDomainTest) {
+  struct UniqueTag;
+  rcu_domain<UniqueTag> newdomain(nullptr);
+  synchronize_rcu();
+}
+
+TEST(RcuTest, MovableReader) {
+  {
+    rcu_reader g;
+    rcu_reader f(std::move(g));
+  }
+  synchronize_rcu();
+  {
+    rcu_reader g(std::defer_lock);
+    rcu_reader f;
+    g = std::move(f);
+  }
+  synchronize_rcu();
+}
+
+TEST(RcuTest, SynchronizeInCall) {
+  rcu_default_domain()->call([]() { synchronize_rcu(); });
+  synchronize_rcu();
+}
+
+TEST(RcuTest, MoveReaderBetweenThreads) {
+  rcu_reader g;
+  std::thread t([f = std::move(g)] {});
+  t.join();
+  synchronize_rcu();
+}
+
+TEST(RcuTest, ForkTest) {
+  folly::Baton<> b;
+  rcu_token epoch;
+  std::thread t([&]() {
+    epoch = rcu_default_domain()->lock_shared();
+    b.post();
+  });
+  t.detach();
+  b.wait();
+  auto pid = fork();
+  if (pid) {
+    // parent
+    rcu_default_domain()->unlock_shared(std::move(epoch));
+    synchronize_rcu();
+    int status;
+    auto pid2 = wait(&status);
+    EXPECT_EQ(status, 0);
+    EXPECT_EQ(pid, pid2);
+  } else {
+    // child
+    synchronize_rcu();
+    exit(0); // Do not print gtest results
+  }
+}
+
+TEST(RcuTest, CoreLocalList) {
+  struct TTag;
+  folly::detail::ThreadCachedLists<TTag> lists;
+  int numthreads = 32;
+  std::vector<std::thread> threads;
+  std::atomic<int> done{0};
+  for (int tr = 0; tr < numthreads; tr++) {
+    threads.push_back(std::thread([&]() {
+      for (int i = 0; i < FLAGS_iters; i++) {
+        auto node = new folly::detail::ThreadCachedListsBase::Node;
+        lists.push(node);
+      }
+      done++;
+    }));
+  }
+  while (done.load() != numthreads) {
+    folly::detail::ThreadCachedLists<TTag>::ListHead list{};
+    lists.collect(list);
+    list.forEach([](folly::detail::ThreadCachedLists<TTag>::Node* node) {
+      delete node;
+    });
+  }
+  for (auto& thread : threads) {
+    thread.join();
+  }
+}
+
+TEST(RcuTest, ThreadDeath) {
+  bool del = false;
+  std::thread t([&] {
+    auto foo = new des(&del);
+    rcu_retire(foo);
+  });
+  t.join();
+  synchronize_rcu();
+  EXPECT_TRUE(del);
+}
+
+TEST(RcuTest, RcuObjBase) {
+  bool retired = false;
+  struct base_test : rcu_obj_base<base_test> {
+    bool* ret_;
+    base_test(bool* ret) : ret_(ret) {}
+    ~base_test() {
+      (*ret_) = true;
+    }
+  };
+
+  auto foo = new base_test(&retired);
+  foo->retire();
+  synchronize_rcu();
+  EXPECT_TRUE(retired);
+}