--- /dev/null
+/*
+ * Copyright 2017-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/Function.h>
+#include <folly/detail/AtFork.h>
+#include <folly/detail/TurnSequencer.h>
+
+namespace folly {
+
+template <typename Tag>
+bool rcu_domain<Tag>::singleton_{false};
+
+template <typename Tag>
+rcu_domain<Tag>::rcu_domain(Executor* executor) noexcept
+ : executor_(executor ? executor : &QueuedImmediateExecutor::instance()) {
+ // Please use a unique tag for each domain.
+ CHECK(!singleton_);
+ singleton_ = true;
+
+ // Register fork handlers. Holding read locks across fork is not
+ // supported. Using read locks in other atfork handlers is not
+ // supported. Other atfork handlers launching new child threads
+ // that use read locks *is* supported.
+ detail::AtFork::registerHandler(
+ this,
+ [this]() { syncMutex_.lock(); },
+ [this]() { syncMutex_.unlock(); },
+ [this]() {
+ counters_.resetAfterFork();
+ syncMutex_.unlock();
+ });
+}
+
+template <typename Tag>
+rcu_domain<Tag>::~rcu_domain() {
+ detail::AtFork::unregisterHandler(this);
+}
+
+template <typename Tag>
+rcu_token rcu_domain<Tag>::lock_shared() {
+ auto idx = version_.load(std::memory_order_acquire);
+ idx &= 1;
+ counters_.increment(idx);
+
+ return idx;
+}
+
+template <typename Tag>
+void rcu_domain<Tag>::unlock_shared(rcu_token&& token) {
+ DCHECK(0 == token.epoch_ || 1 == token.epoch_);
+ counters_.decrement(token.epoch_);
+}
+
+template <typename Tag>
+template <typename T>
+void rcu_domain<Tag>::call(T&& cbin) {
+ auto node = new list_node;
+ node->cb_ = [node, cb = std::forward<T>(cbin)]() {
+ cb();
+ delete node;
+ };
+ retire(node);
+}
+
+template <typename Tag>
+void rcu_domain<Tag>::retire(list_node* node) noexcept {
+ q_.push(node);
+
+ // Note that it's likely we hold a read lock here,
+ // so we can only half_sync(false). half_sync(true)
+ // or a synchronize() call might block forever.
+ uint64_t time = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now().time_since_epoch())
+ .count();
+ if (time > syncTime_.load(std::memory_order_relaxed) + syncTimePeriod_) {
+ list_head finished;
+ {
+ std::lock_guard<std::mutex> g(syncMutex_);
+ syncTime_.store(time, std::memory_order_relaxed);
+ half_sync(false, finished);
+ }
+ // callbacks are called outside of syncMutex_
+ finished.forEach(
+ [&](list_node* item) { executor_->add(std::move(item->cb_)); });
+ }
+}
+
+template <typename Tag>
+void rcu_domain<Tag>::synchronize() noexcept {
+ auto curr = version_.load(std::memory_order_acquire);
+ // Target is two epochs away.
+ auto target = curr + 2;
+ while (true) {
+ // Try to assign ourselves to do the sync work.
+ // If someone else is already assigned, we can wait for
+ // the work to be finished by waiting on turn_.
+ auto work = work_.load(std::memory_order_acquire);
+ auto tmp = work;
+ if (work < target && work_.compare_exchange_strong(tmp, target)) {
+ list_head finished;
+ {
+ std::lock_guard<std::mutex> g(syncMutex_);
+ while (version_.load(std::memory_order_acquire) < target) {
+ half_sync(true, finished);
+ }
+ }
+ // callbacks are called outside of syncMutex_
+ finished.forEach(
+ [&](list_node* node) { executor_->add(std::move(node->cb_)); });
+ return;
+ } else {
+ if (version_.load(std::memory_order_acquire) >= target) {
+ return;
+ }
+ std::atomic<uint32_t> cutoff{100};
+ // Wait for someone to finish the work.
+ turn_.tryWaitForTurn(work, cutoff, false);
+ }
+ }
+}
+
+/*
+ * Not multithread safe, but it could be with proper version
+ * checking and stronger increment of version. See
+ * https://github.com/pramalhe/ConcurrencyFreaks/blob/master/papers/gracesharingurcu-2016.pdf
+ *
+ * This version, however, can go to sleep if there are outstanding
+ * readers, and does not spin or need rescheduling, unless blocking = false.
+ */
+template <typename Tag>
+void rcu_domain<Tag>::half_sync(bool blocking, list_head& finished) {
+ uint64_t curr = version_.load(std::memory_order_acquire);
+ auto next = curr + 1;
+
+ // Push all work to a queue for moving through two epochs. One
+ // version is not enough because of late readers of the version_
+ // counter in lock_shared.
+ //
+ // Note that for a similar reason we can't swap out the q here,
+ // and instead drain it, so concurrent calls to call() are safe,
+ // and will wait for the next epoch.
+ q_.collect(queues_[0]);
+
+ if (blocking) {
+ counters_.waitForZero(next & 1);
+ } else {
+ if (counters_.readFull(next & 1) != 0) {
+ return;
+ }
+ }
+
+ // Run callbacks that have been through two epochs, and swap queues
+ // for those only through a single epoch.
+ finished.splice(queues_[1]);
+ queues_[1].splice(queues_[0]);
+
+ version_.store(next, std::memory_order_release);
+ // Notify synchronous waiters in synchronize().
+ turn_.completeTurn(curr);
+}
+
+} // namespace folly
--- /dev/null
+/*
+ * Copyright 2017-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/synchronization/Rcu.h>
+
+namespace folly {
+
+FOLLY_STATIC_CTOR_PRIORITY_MAX rcu_domain<RcuTag> rcu_default_domain_;
+
+} // namespace folly
--- /dev/null
+/*
+ * Copyright 2017-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <atomic>
+#include <functional>
+#include <limits>
+
+#include <folly/Optional.h>
+#include <folly/detail/TurnSequencer.h>
+#include <folly/executors/QueuedImmediateExecutor.h>
+#include <folly/synchronization/detail/ThreadCachedInts.h>
+#include <folly/synchronization/detail/ThreadCachedLists.h>
+
+// Implementation of proposed RCU C++ API
+// http://open-std.org/JTC1/SC22/WG21/docs/papers/2017/p0566r3.pdf
+
+// Overview
+
+// This file provides a low-overhead mechanism to guarantee ordering
+// between operations on shared data. In the simplest usage pattern,
+// readers enter a critical section, view some state, and leave the
+// critical section, while writers modify shared state and then defer
+// some cleanup operations. Proper use of these classes will guarantee
+// that a cleanup operation that is deferred during a reader critical
+// section will not be executed until after that critical section is
+// over.
+
+// Example
+
+// As an example, suppose we have some configuration data that gets
+// periodically updated. We need to protect ourselves on *every* read
+// path, even if updates are only vanishly rare, because we don't know
+// if some writer will come along and yank the data out from under us.
+//
+// Here's how that might look without deferral:
+//
+// void doSomething(IPAddress host);
+//
+// folly::SharedMutex sm;
+// ConfigData* globalConfigData;
+//
+// void reader() {
+// while (true) {
+// sm.lock_shared();
+// IPAddress curManagementServer = globalConfigData->managementServerIP;
+// sm.unlock_shared();
+// doSomethingWith(curManagementServer);
+// }
+// }
+//
+// void writer() {
+// while (true) {
+// std::this_thread::sleep_for(std::chrono::seconds(60));
+// ConfigData* oldConfigData = globalConfigData;
+// ConfigData* newConfigData = loadConfigDataFromRemoteServer();
+// sm.lock();
+// globalConfigData = newConfigData;
+// sm.unlock();
+// delete oldConfigData;
+// }
+// }
+//
+// The readers have to lock_shared and unlock_shared every iteration, even
+// during the overwhelming majority of the time in which there's no writer
+// present. These functions are surprisingly expensive; there's around 30ns of
+// overhead per critical section on my machine.
+//
+// Let's get rid of the locking. The readers and writer may proceed
+// concurrently. Here's how this would look:
+//
+// void doSomething(IPAddress host);
+//
+// std::atomic<ConfigData*> globalConfigData;
+//
+// void reader() {
+// while (true) {
+// ConfigData* configData = globalConfigData.load();
+// IPAddress curManagementServer = configData->managementServerIP;
+// doSomethingWith(curManagementServer);
+// }
+// }
+//
+// void writer() {
+// while (true) {
+// std::this_thread::sleep_for(std::chrono::seconds(60));
+// ConfigData* newConfigData = loadConfigDataFromRemoteServer();
+// globalConfigData.store(newConfigData);
+// // We can't delete the old ConfigData; we don't know when the readers
+// // will be done with it! I guess we have to leak it...
+// }
+// }
+//
+// This works and is fast, but we don't ever reclaim the memory we
+// allocated for the copy of the data. We need a way for the writer to
+// know that no readers observed the old value of the pointer and are
+// still using it. Tweaking this slightly, we want a way for the
+// writer to say "I want some operation (deleting the old ConfigData)
+// to happen, but only after I know that all readers that started
+// before I requested the action have finished.". The classes in this
+// namespace allow this. Here's how we'd express this idea:
+//
+// void doSomething(IPAddress host);
+// std::atomic<ConfigData*> globalConfigData;
+//
+//
+// void reader() {
+// while (true) {
+// IPAddress curManagementServer;
+// {
+// // We're about to do some reads we want to protect; if we read a
+// // pointer, we need to make sure that if the writer comes along and
+// // updates it, the writer's cleanup operation won't happen until we're
+// // done accessing the pointed-to data. We get a Guard on that
+// // domain; as long as it exists, no function subsequently passed to
+// // invokeEventually will execute.
+// rcu_reader guard;
+// ConfigData* configData = globalConfigData.load();
+// // We created a guard before we read globalConfigData; we know that the
+// // pointer will remain valid until the guard is destroyed.
+// curManagementServer = configData->managementServerIP;
+// // Guard is released here; retired objects may be freed.
+// }
+// doSomethingWith(curManagementServer);
+// }
+// }
+//
+// void writer() {
+//
+// while (true) {
+// std::this_thread::sleep_for(std::chrono::seconds(60));
+// ConfigData* oldConfigData = globalConfigData.load();
+// ConfigData* newConfigData = loadConfigDataFromRemoteServer();
+// globalConfigData.store(newConfigData);
+// rcu_retire(oldConfigData);
+// // alternatively, in a blocking manner:
+// // synchronize_rcu();
+// // delete oldConfigData;
+// }
+// }
+//
+// This gets us close to the speed of the second solution, without
+// leaking memory. A rcu_reader costs about 4 ns, faster than the
+// lock_shared() / unlock_shared() pair in the more traditional
+// mutex-based approach from our first attempt, and the writer
+// never blocks the readers.
+
+// Notes
+
+// This implementation does implement an rcu_domain, and provides a default
+// one for use per the standard implementation.
+//
+// rcu_domain:
+// A "universe" of deferred execution. Each rcu_domain has an
+// executor on which deferred functions may execute. Readers obtain
+// Tokens from an rcu_domain and release them back to it.
+// rcu_domains should in general be completely separated; it's never
+// correct to pass a token from one domain to another, and
+// rcu_reader objects created on one domain do not prevent functions
+// deferred on other domains from running. It's intended that most
+// callers should only ever use the default, global domain.
+//
+// Creation of a domain takes a template tag argument, which
+// defaults to void. To access different domains, you have to pass a
+// different tag. The global domain is preferred for almost all
+// purposes, unless a different executor is required.
+//
+// You should use a custom rcu_domain if you can't avoid sleeping
+// during reader critical sections (because you don't want to block
+// all other users of the domain while you sleep), or you want to change
+// the default executor type.
+
+// API correctness limitations:
+// - Exceptions:
+// In short, nothing about this is exception safe. retire functions should
+// not throw exceptions in their destructors, move constructors or call
+// operators.
+//
+// Performance limitations:
+// - Blocking:
+// A blocked reader will block invocation of deferred functions until it
+// becomes unblocked. Sleeping while holding a rcu_reader can have bad
+// performance consequences.
+//
+// API questions you might have:
+// - Nested critical sections:
+// These are fine. The following is explicitly allowed by the standard, up to
+// a nesting level of 100:
+// rcu_reader reader1;
+// doSomeWork();
+// rcu_reader reader2;
+// doSomeMoreWork();
+// - Restrictions on retired()ed functions:
+// Any operation is safe from within a retired function's
+// execution; you can retire additional functions or add a new domain call to
+// the domain.
+// - rcu_domain destruction:
+// Destruction of a domain assumes previous synchronization: all remaining
+// call and retire calls are immediately added to the executor.
+
+// Overheads
+
+// Deferral latency is as low as is practical: overhead involves running
+// several global memory barriers on the machine to ensure all readers are gone.
+//
+// Currently use of MPMCQueue is the bottleneck for deferred calls, more
+// specialized queues could be used if available, since only a single reader
+// reads the queue, and can splice all of the items to the executor if possible.
+//
+// synchronize_rcu() call latency is on the order of 10ms. Multiple
+// separate threads can share a synchronized period and should scale.
+//
+// rcu_retire() is a queue push, and on the order of 150 ns, however,
+// the current implementation may synchronize if the retire queue is full,
+// resulting in tail latencies of ~10ms.
+//
+// rcu_reader creation/destruction is ~4ns. By comparison,
+// folly::SharedMutex::lock_shared + unlock_shared pair is ~26ns
+
+// Hazard pointers vs. RCU:
+//
+// Hazard pointers protect pointers, generally malloc()d pieces of memory, and
+// each hazptr only protects one such block.
+//
+// RCU protects critical sections, *all* memory is protected as long
+// as the critical section is active.
+//
+// For example, this has implications for linked lists: If you wish to
+// free an entire linked list, you simply rcu_retire() each node with
+// RCU: readers will see either an entirely valid list, or no list at
+// all.
+//
+// Conversely with hazptrs, generally lists are walked with
+// hand-over-hand hazptrs. Only one node is protected at a time. So
+// anywhere in the middle of the list, hazptrs may read NULL, and throw
+// away all current work. Alternatively, reference counting can be used,
+// (as if each node was a shared_ptr<node>), so that readers will always see
+// *the remaining part* of the list as valid, however parts before its current
+// hazptr may be freed.
+//
+// So roughly: RCU is simple, but an all-or-nothing affair. A single rcu_reader
+// can block all reclamation. Hazptrs will reclaim exactly as much as possible,
+// at the cost of extra work writing traversal code
+//
+// Reproduced from
+// http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2017/p0461r1.pdf
+//
+// Reference Counting RCU Hazptr
+//
+// Unreclaimed objects Bounded Unbounded Bounded
+//
+// Contention among readers High None None
+//
+// Traversal forward progress lock-free wait-free lock-free
+//
+// Reclamation forward progress lock-free blocking wait-free
+//
+// Traversal speed atomic no-overhead folly's is
+// no-overhead
+//
+// Reference acquisition unconditional unconditional conditional
+//
+// Automatic reclamation yes no no
+//
+// Purpose of domains N/A isolate slow configeration
+// readers
+
+namespace folly {
+
+struct RcuTag;
+
+template <typename Tag = RcuTag>
+class rcu_domain;
+
+// Opaque token used to match up lock_shared() and unlock_shared()
+// pairs.
+class rcu_token {
+ public:
+ rcu_token(uint64_t epoch) : epoch_(epoch) {}
+ rcu_token() {}
+ ~rcu_token() = default;
+
+ rcu_token(const rcu_token&) = delete;
+ rcu_token(rcu_token&& other) = default;
+ rcu_token& operator=(const rcu_token& other) = delete;
+ rcu_token& operator=(rcu_token&& other) = default;
+
+ private:
+ template <typename Tag>
+ friend class rcu_domain;
+ uint64_t epoch_;
+};
+
+// For most usages, rcu_domain is unnecessary, and you can use
+// rcu_reader and rcu_retire/synchronize_rcu directly.
+template <typename Tag>
+class rcu_domain {
+ using list_head = typename detail::ThreadCachedLists<Tag>::ListHead;
+ using list_node = typename detail::ThreadCachedLists<Tag>::Node;
+
+ public:
+ /*
+ * If an executor is passed, it is used to run calls and delete
+ * retired objects.
+ */
+ rcu_domain(Executor* executor = nullptr) noexcept;
+
+ rcu_domain(const rcu_domain&) = delete;
+ rcu_domain(rcu_domain&&) = delete;
+ rcu_domain& operator=(const rcu_domain&) = delete;
+ rcu_domain& operator=(rcu_domain&&) = delete;
+ ~rcu_domain();
+
+ // Reader locks: Prevent any calls from occuring, retired memory
+ // from being freed, and synchronize() calls from completing until
+ // all preceding lock_shared() sections are finished.
+
+ // Note: can potentially allocate on thread first use.
+ FOLLY_ALWAYS_INLINE rcu_token lock_shared();
+ FOLLY_ALWAYS_INLINE void unlock_shared(rcu_token&&);
+
+ // Call a function after concurrent critical sections have finished.
+ // Does not block unless the queue is full, then may block to wait
+ // for scheduler thread, but generally does not wait for full
+ // synchronization.
+ template <typename T>
+ void call(T&& cbin);
+ void retire(list_node* node) noexcept;
+
+ // Ensure concurrent critical sections have finished.
+ // Always waits for full synchronization.
+ // read lock *must not* be held.
+ void synchronize() noexcept;
+
+ private:
+ detail::ThreadCachedInts<Tag> counters_;
+ // Global epoch.
+ std::atomic<uint64_t> version_{0};
+ // Future epochs being driven by threads in synchronize
+ std::atomic<uint64_t> work_{0};
+ // Matches version_, for waking up threads in synchronize that are
+ // sharing an epoch.
+ detail::TurnSequencer<std::atomic> turn_;
+ // Only one thread can drive half_sync.
+ std::mutex syncMutex_;
+ // Currently half_sync takes ~16ms due to heavy barriers.
+ // Ensure that if used in a single thread, max GC time
+ // is limited to 1% of total CPU time.
+ static constexpr uint64_t syncTimePeriod_{1600 * 2 /* full sync is 2x */};
+ std::atomic<uint64_t> syncTime_{0};
+ // call()s waiting to move through two epochs.
+ detail::ThreadCachedLists<Tag> q_;
+ // Executor callbacks will eventually be run on.
+ Executor* executor_{nullptr};
+ static bool singleton_; // Ensure uniqueness per-tag.
+
+ // Queues for callbacks waiting to go through two epochs.
+ list_head queues_[2]{};
+
+ // Move queues through one epoch (half of a full synchronize()).
+ // Will block waiting for readers to exit if blocking is true.
+ // blocking must *not* be true if the current thread is locked,
+ // or will deadlock.
+ //
+ // returns a list of callbacks ready to run in cbs.
+ void half_sync(bool blocking, list_head& cbs);
+};
+
+extern rcu_domain<RcuTag> rcu_default_domain_;
+
+inline rcu_domain<RcuTag>* rcu_default_domain() {
+ return &rcu_default_domain_;
+}
+
+// Main reader guard class.
+class rcu_reader {
+ public:
+ FOLLY_ALWAYS_INLINE rcu_reader() noexcept
+ : epoch_(rcu_default_domain()->lock_shared()) {}
+ rcu_reader(std::defer_lock_t) noexcept {}
+ rcu_reader(const rcu_reader&) = delete;
+ rcu_reader(rcu_reader&& other) noexcept : epoch_(std::move(other.epoch_)) {}
+ rcu_reader& operator=(const rcu_reader&) = delete;
+ rcu_reader& operator=(rcu_reader&& other) noexcept {
+ if (epoch_.has_value()) {
+ rcu_default_domain()->unlock_shared(std::move(epoch_.value()));
+ }
+ epoch_ = std::move(other.epoch_);
+ return *this;
+ }
+
+ FOLLY_ALWAYS_INLINE ~rcu_reader() noexcept {
+ if (epoch_.has_value()) {
+ rcu_default_domain()->unlock_shared(std::move(epoch_.value()));
+ }
+ }
+
+ void swap(rcu_reader& other) noexcept {
+ std::swap(epoch_, other.epoch_);
+ }
+
+ FOLLY_ALWAYS_INLINE void lock() noexcept {
+ DCHECK(!epoch_.has_value());
+ epoch_ = rcu_default_domain()->lock_shared();
+ }
+
+ FOLLY_ALWAYS_INLINE void unlock() noexcept {
+ DCHECK(epoch_.has_value());
+ rcu_default_domain()->unlock_shared(std::move(epoch_.value()));
+ }
+
+ private:
+ Optional<rcu_token> epoch_;
+};
+
+inline void swap(rcu_reader& a, rcu_reader& b) noexcept {
+ a.swap(b);
+}
+
+inline void synchronize_rcu() noexcept {
+ rcu_default_domain()->synchronize();
+}
+
+inline void rcu_barrier() noexcept {
+ rcu_default_domain()->synchronize();
+}
+
+// Free-function retire. Always allocates.
+template <typename T, typename D = std::default_delete<T>>
+void rcu_retire(T* p, D d = {}) {
+ rcu_default_domain()->call([p, del = std::move(d)]() { del(p); });
+}
+
+// Base class for rcu objects. retire() will use preallocated storage
+// from rcu_obj_base, vs. rcu_retire() which always allocates.
+template <typename T, typename D = std::default_delete<T>>
+class rcu_obj_base : detail::ThreadCachedListsBase::Node {
+ public:
+ void retire(D d = {}) {
+ // This implementation assumes folly::Function has enough
+ // inline storage for D, otherwise, it allocates.
+ this->cb_ = [this, d = std::move(d)]() { d(static_cast<T*>(this)); };
+ rcu_default_domain()->retire(this);
+ }
+};
+
+} // namespace folly
+
+#include <folly/synchronization/Rcu-inl.h>
--- /dev/null
+/*
+ * Copyright 2017-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <folly/Function.h>
+#include <folly/ThreadLocal.h>
+#include <folly/synchronization/AsymmetricMemoryBarrier.h>
+
+// This is unlike folly::ThreadCachedInt in that the full value
+// is never rounded up globally and cached, it only supports readFull.
+//
+// folly/experimental/TLRefCount is similar, but does not support a
+// waitForZero, and is not reset-able.
+//
+// Note that the RCU implementation is completely abstracted from the
+// counter implementation, a rseq implementation can be dropped in
+// if the kernel supports it.
+
+namespace folly {
+
+namespace detail {
+
+template <typename Tag>
+class ThreadCachedInts {
+ // These are only accessed under the ThreadLocal lock.
+ int64_t orphan_inc_[2]{0, 0};
+ int64_t orphan_dec_[2]{0, 0};
+ folly::detail::Futex<> waiting_;
+
+ class Integer {
+ public:
+ ThreadCachedInts* ints_;
+ constexpr Integer(ThreadCachedInts* ints) noexcept
+ : ints_(ints), inc_{}, dec_{} {}
+ std::atomic<int64_t> inc_[2];
+ std::atomic<int64_t> dec_[2];
+ ~Integer() noexcept {
+ ints_->orphan_inc_[0] += inc_[0].load(std::memory_order_relaxed);
+ ints_->orphan_inc_[1] += inc_[1].load(std::memory_order_relaxed);
+ ints_->orphan_dec_[0] += dec_[0].load(std::memory_order_relaxed);
+ ints_->orphan_dec_[1] += dec_[1].load(std::memory_order_relaxed);
+ ints_->waiting_.store(0, std::memory_order_release);
+ ints_->waiting_.futexWake();
+ }
+ };
+ folly::ThreadLocalPtr<Integer, Tag> cs_;
+
+ // Cache the int pointer in a threadlocal.
+ static thread_local Integer* int_cache_;
+
+ void init() {
+ auto ret = new Integer(this);
+ cs_.reset(ret);
+ int_cache_ = ret;
+ }
+
+ public:
+ FOLLY_ALWAYS_INLINE void increment(uint8_t epoch) {
+ if (!int_cache_) {
+ init();
+ }
+
+ auto& c = int_cache_->inc_[epoch];
+ auto val = c.load(std::memory_order_relaxed);
+ c.store(val + 1, std::memory_order_relaxed);
+
+ folly::asymmetricLightBarrier(); // A
+ }
+
+ FOLLY_ALWAYS_INLINE void decrement(uint8_t epoch) {
+ folly::asymmetricLightBarrier(); // B
+ if (!int_cache_) {
+ init();
+ }
+
+ auto& c = int_cache_->dec_[epoch];
+ auto val = c.load(std::memory_order_relaxed);
+ c.store(val + 1, std::memory_order_relaxed);
+
+ folly::asymmetricLightBarrier(); // C
+ if (waiting_.load(std::memory_order_acquire)) {
+ waiting_.store(0, std::memory_order_release);
+ waiting_.futexWake();
+ }
+ }
+
+ int64_t readFull(uint8_t epoch) {
+ int64_t full = 0;
+
+ // Matches A - ensure all threads have seen new value of version,
+ // *and* that we see current values of counters in readFull()
+ //
+ // Note that in lock_shared if a reader is currently between the
+ // version load and counter increment, they may update the wrong
+ // epoch. However, this is ok - they started concurrently *after*
+ // any callbacks that will run, and therefore it is safe to run
+ // the callbacks.
+ folly::asymmetricHeavyBarrier();
+ for (auto& i : cs_.accessAllThreads()) {
+ full -= i.dec_[epoch].load(std::memory_order_relaxed);
+ }
+
+ // Matches B - ensure that all increments are seen if decrements
+ // are seen. This is necessary because increment and decrement
+ // are allowed to happen on different threads.
+ folly::asymmetricHeavyBarrier();
+
+ auto accessor = cs_.accessAllThreads();
+ for (auto& i : accessor) {
+ full += i.inc_[epoch].load(std::memory_order_relaxed);
+ }
+
+ // orphan is read behind accessAllThreads lock
+ auto res = full + orphan_inc_[epoch] - orphan_dec_[epoch];
+ return res;
+ }
+
+ void waitForZero(uint8_t phase) {
+ // Try reading before futex sleeping.
+ if (readFull(phase) == 0) {
+ return;
+ }
+
+ while (true) {
+ waiting_.store(1, std::memory_order_release);
+ // Matches C. Ensure either decrement sees waiting_,
+ // or we see their decrement and can safely sleep.
+ folly::asymmetricHeavyBarrier();
+ if (readFull(phase) == 0) {
+ break;
+ }
+ waiting_.futexWait(1);
+ }
+ waiting_.store(0, std::memory_order_relaxed);
+ }
+
+ // We are guaranteed to be called while StaticMeta lock is still
+ // held because of ordering in AtForkList. We can therefore safely
+ // touch orphan_ and clear out all counts.
+ void resetAfterFork() {
+ if (int_cache_) {
+ int_cache_->dec_[0].store(0, std::memory_order_relaxed);
+ int_cache_->dec_[1].store(0, std::memory_order_relaxed);
+ int_cache_->inc_[0].store(0, std::memory_order_relaxed);
+ int_cache_->inc_[1].store(0, std::memory_order_relaxed);
+ }
+ orphan_inc_[0] = 0;
+ orphan_inc_[1] = 0;
+ orphan_dec_[0] = 0;
+ orphan_dec_[1] = 0;
+ }
+};
+
+template <typename Tag>
+thread_local typename detail::ThreadCachedInts<Tag>::Integer*
+ detail::ThreadCachedInts<Tag>::int_cache_{nullptr};
+
+} // namespace detail
+} // namespace folly
--- /dev/null
+/*
+ * Copyright 2017-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <atomic>
+
+#include <folly/Function.h>
+#include <folly/ThreadLocal.h>
+#include <glog/logging.h>
+
+namespace folly {
+
+namespace detail {
+
+// This is a thread-local cached, multi-producer single-consumer
+// queue, similar to a concurrent version of std::list.
+//
+class ThreadCachedListsBase {
+ public:
+ struct Node {
+ folly::Function<void()> cb_;
+ Node* next_{nullptr};
+ };
+};
+
+template <typename Tag>
+class ThreadCachedLists : public ThreadCachedListsBase {
+ public:
+ struct AtomicListHead {
+ std::atomic<Node*> tail_{nullptr};
+ std::atomic<Node*> head_{nullptr};
+ };
+
+ // Non-concurrent list, similar to std::list.
+ struct ListHead {
+ Node* head_{nullptr};
+ Node* tail_{nullptr};
+
+ // Run func on each list node.
+ template <typename Func>
+ void forEach(Func func) {
+ auto node = tail_;
+ while (node != nullptr) {
+ auto next = node->next_;
+ func(node);
+ node = next;
+ }
+ }
+
+ // Splice other in to this list.
+ // Afterwards, other is a valid empty listhead.
+ void splice(ListHead& other);
+
+ void splice(AtomicListHead& other);
+ };
+
+ // Push a node on a thread-local list. Returns true if local list
+ // was pushed global.
+ void push(Node* node);
+
+ // Collect all thread local lists to a single local list.
+ // This function is threadsafe with concurrent push()es,
+ // but only a single thread may call collect() at a time.
+ void collect(ListHead& list);
+
+ private:
+ // Push list to the global list.
+ void pushGlobal(ListHead& list);
+
+ ListHead ghead_;
+
+ struct TLHead : public AtomicListHead {
+ ThreadCachedLists* parent_;
+
+ public:
+ TLHead(ThreadCachedLists* parent) : parent_(parent) {}
+
+ ~TLHead() {
+ parent_->ghead_.splice(*this);
+ }
+ };
+
+ folly::ThreadLocalPtr<TLHead, Tag> lhead_;
+};
+
+// push() and splice() are optimistic w.r.t setting the list head: The
+// first pusher cas's the list head, which functions as a lock until
+// tail != null. The first pusher then sets tail_ = head_.
+//
+// splice() does the opposite: steals the tail_ via exchange, then
+// unlocks the list again by setting head_ to null.
+template <typename Tag>
+void ThreadCachedLists<Tag>::push(Node* node) {
+ DCHECK(node->next_ == nullptr);
+ static thread_local TLHead* cache_{nullptr};
+
+ if (!cache_) {
+ auto l = lhead_.get();
+ if (!l) {
+ lhead_.reset(new TLHead(this));
+ l = lhead_.get();
+ DCHECK(l);
+ }
+ cache_ = l;
+ }
+
+ while (true) {
+ auto head = cache_->head_.load(std::memory_order_relaxed);
+ if (!head) {
+ node->next_ = nullptr;
+ if (cache_->head_.compare_exchange_weak(head, node)) {
+ cache_->tail_.store(node);
+ break;
+ }
+ } else {
+ auto tail = cache_->tail_.load(std::memory_order_relaxed);
+ if (tail) {
+ node->next_ = tail;
+ if (cache_->tail_.compare_exchange_weak(node->next_, node)) {
+ break;
+ }
+ }
+ }
+ }
+}
+
+template <typename Tag>
+void ThreadCachedLists<Tag>::collect(ListHead& list) {
+ auto acc = lhead_.accessAllThreads();
+
+ for (auto& thr : acc) {
+ list.splice(thr);
+ }
+
+ list.splice(ghead_);
+}
+
+template <typename Tag>
+void ThreadCachedLists<Tag>::ListHead::splice(ListHead& other) {
+ if (other.head_ != nullptr) {
+ DCHECK(other.tail_ != nullptr);
+ } else {
+ DCHECK(other.tail_ == nullptr);
+ return;
+ }
+
+ if (head_) {
+ DCHECK(tail_ != nullptr);
+ DCHECK(head_->next_ == nullptr);
+ head_->next_ = other.tail_;
+ head_ = other.head_;
+ } else {
+ DCHECK(head_ == nullptr);
+ head_ = other.head_;
+ tail_ = other.tail_;
+ }
+
+ other.head_ = nullptr;
+ other.tail_ = nullptr;
+}
+
+template <typename Tag>
+void ThreadCachedLists<Tag>::ListHead::splice(AtomicListHead& list) {
+ ListHead local;
+
+ auto tail = list.tail_.load();
+ if (tail) {
+ local.tail_ = list.tail_.exchange(nullptr);
+ local.head_ = list.head_.exchange(nullptr);
+ splice(local);
+ }
+}
+
+} // namespace detail
+} // namespace folly
--- /dev/null
+/*
+ * Copyright 2017-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/synchronization/Rcu.h>
+
+#include <thread>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include <folly/Benchmark.h>
+#include <folly/Random.h>
+#include <folly/portability/GFlags.h>
+#include <folly/portability/GTest.h>
+#include <folly/synchronization/Baton.h>
+
+using namespace folly;
+
+DEFINE_int64(iters, 100000, "Number of iterations");
+DEFINE_int64(threads, 32, "Number of threads");
+
+TEST(RcuTest, Basic) {
+ auto foo = new int(2);
+ rcu_retire(foo);
+}
+
+class des {
+ bool* d_;
+
+ public:
+ des(bool* d) : d_(d) {}
+ ~des() {
+ *d_ = true;
+ }
+};
+
+TEST(RcuTest, Guard) {
+ bool del = false;
+ auto foo = new des(&del);
+ { rcu_reader g; }
+ rcu_retire(foo);
+ synchronize_rcu();
+ EXPECT_TRUE(del);
+}
+
+TEST(RcuTest, Perf) {
+ long i = FLAGS_iters;
+ auto start = std::chrono::steady_clock::now();
+ while (i-- > 0) {
+ rcu_reader g;
+ }
+ auto diff = std::chrono::steady_clock::now() - start;
+ printf(
+ "Total time %li ns \n",
+ std::chrono::duration_cast<std::chrono::nanoseconds>(diff).count() /
+ FLAGS_iters);
+}
+
+TEST(RcuTest, ResetPerf) {
+ long i = FLAGS_iters;
+ auto start = std::chrono::steady_clock::now();
+ while (i-- > 0) {
+ rcu_retire<int>(nullptr, [](int*) {});
+ }
+ auto diff = std::chrono::steady_clock::now() - start;
+ printf(
+ "Total time %li ns \n",
+ std::chrono::duration_cast<std::chrono::nanoseconds>(diff).count() /
+ FLAGS_iters);
+}
+
+TEST(RcuTest, SlowReader) {
+ std::thread t;
+ {
+ rcu_reader g;
+
+ t = std::thread([&]() { synchronize_rcu(); });
+ usleep(100); // Wait for synchronize to start
+ }
+ t.join();
+}
+
+rcu_reader tryretire(des* obj) {
+ rcu_reader g;
+ rcu_retire(obj);
+ return g;
+}
+
+TEST(RcuTest, CopyGuard) {
+ bool del = false;
+ auto foo = new des(&del);
+ {
+ auto res = tryretire(foo);
+ EXPECT_FALSE(del);
+ }
+ rcu_barrier();
+ EXPECT_TRUE(del);
+}
+
+TEST(RcuTest, Stress) {
+ std::vector<std::thread> threads;
+ constexpr uint32_t sz = 1000;
+ std::atomic<int*> ints[sz];
+ for (uint i = 0; i < sz; i++) {
+ ints[i].store(new int(0));
+ }
+ for (int th = 0; th < FLAGS_threads; th++) {
+ threads.push_back(std::thread([&]() {
+ for (int i = 0; i < FLAGS_iters / 100; i++) {
+ rcu_reader g;
+ int sum = 0;
+ int* ptrs[sz];
+ for (uint j = 0; j < sz; j++) {
+ ptrs[j] = ints[j].load(std::memory_order_acquire);
+ }
+ for (uint j = 0; j < sz; j++) {
+ sum += *ptrs[j];
+ }
+ EXPECT_EQ(sum, 0);
+ }
+ }));
+ }
+ std::atomic<bool> done{false};
+ std::thread updater([&]() {
+ while (!done.load()) {
+ auto newint = new int(0);
+ auto oldint = ints[folly::Random::rand32() % sz].exchange(newint);
+ rcu_retire<int>(oldint, [](int* obj) {
+ *obj = folly::Random::rand32();
+ delete obj;
+ });
+ }
+ });
+ for (auto& t : threads) {
+ t.join();
+ }
+ done = true;
+ updater.join();
+}
+
+TEST(RcuTest, Synchronize) {
+ std::vector<std::thread> threads;
+ for (int th = 0; th < FLAGS_threads; th++) {
+ threads.push_back(std::thread([&]() {
+ for (int i = 0; i < 10; i++) {
+ synchronize_rcu();
+ }
+ }));
+ }
+ for (auto& t : threads) {
+ t.join();
+ }
+}
+
+TEST(RcuTest, NewDomainTest) {
+ struct UniqueTag;
+ rcu_domain<UniqueTag> newdomain(nullptr);
+ synchronize_rcu();
+}
+
+TEST(RcuTest, MovableReader) {
+ {
+ rcu_reader g;
+ rcu_reader f(std::move(g));
+ }
+ synchronize_rcu();
+ {
+ rcu_reader g(std::defer_lock);
+ rcu_reader f;
+ g = std::move(f);
+ }
+ synchronize_rcu();
+}
+
+TEST(RcuTest, SynchronizeInCall) {
+ rcu_default_domain()->call([]() { synchronize_rcu(); });
+ synchronize_rcu();
+}
+
+TEST(RcuTest, MoveReaderBetweenThreads) {
+ rcu_reader g;
+ std::thread t([f = std::move(g)] {});
+ t.join();
+ synchronize_rcu();
+}
+
+TEST(RcuTest, ForkTest) {
+ folly::Baton<> b;
+ rcu_token epoch;
+ std::thread t([&]() {
+ epoch = rcu_default_domain()->lock_shared();
+ b.post();
+ });
+ t.detach();
+ b.wait();
+ auto pid = fork();
+ if (pid) {
+ // parent
+ rcu_default_domain()->unlock_shared(std::move(epoch));
+ synchronize_rcu();
+ int status;
+ auto pid2 = wait(&status);
+ EXPECT_EQ(status, 0);
+ EXPECT_EQ(pid, pid2);
+ } else {
+ // child
+ synchronize_rcu();
+ exit(0); // Do not print gtest results
+ }
+}
+
+TEST(RcuTest, CoreLocalList) {
+ struct TTag;
+ folly::detail::ThreadCachedLists<TTag> lists;
+ int numthreads = 32;
+ std::vector<std::thread> threads;
+ std::atomic<int> done{0};
+ for (int tr = 0; tr < numthreads; tr++) {
+ threads.push_back(std::thread([&]() {
+ for (int i = 0; i < FLAGS_iters; i++) {
+ auto node = new folly::detail::ThreadCachedListsBase::Node;
+ lists.push(node);
+ }
+ done++;
+ }));
+ }
+ while (done.load() != numthreads) {
+ folly::detail::ThreadCachedLists<TTag>::ListHead list{};
+ lists.collect(list);
+ list.forEach([](folly::detail::ThreadCachedLists<TTag>::Node* node) {
+ delete node;
+ });
+ }
+ for (auto& thread : threads) {
+ thread.join();
+ }
+}
+
+TEST(RcuTest, ThreadDeath) {
+ bool del = false;
+ std::thread t([&] {
+ auto foo = new des(&del);
+ rcu_retire(foo);
+ });
+ t.join();
+ synchronize_rcu();
+ EXPECT_TRUE(del);
+}
+
+TEST(RcuTest, RcuObjBase) {
+ bool retired = false;
+ struct base_test : rcu_obj_base<base_test> {
+ bool* ret_;
+ base_test(bool* ret) : ret_(ret) {}
+ ~base_test() {
+ (*ret_) = true;
+ }
+ };
+
+ auto foo = new base_test(&retired);
+ foo->retire();
+ synchronize_rcu();
+ EXPECT_TRUE(retired);
+}