From f885995c00a27e0e440c0e5cb66ec3298fc507d4 Mon Sep 17 00:00:00 2001 From: Dave Watson Date: Wed, 13 Dec 2017 08:13:20 -0800 Subject: [PATCH] RCU Summary: This adds an RCU implementation, matching http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2017/p0461r1.pdf as closely as pratical. This implementation does not require thread registration or quiescence. Reviewed By: magedm Differential Revision: D6330631 fbshipit-source-id: 2c729f3a4c0f151cde5d9a599ecd2a2c20c7da55 --- folly/synchronization/Rcu-inl.h | 175 +++++++ folly/synchronization/Rcu.cpp | 22 + folly/synchronization/Rcu.h | 462 ++++++++++++++++++ .../synchronization/detail/ThreadCachedInts.h | 173 +++++++ .../detail/ThreadCachedLists.h | 189 +++++++ folly/synchronization/test/RcuTest.cpp | 276 +++++++++++ 6 files changed, 1297 insertions(+) create mode 100644 folly/synchronization/Rcu-inl.h create mode 100644 folly/synchronization/Rcu.cpp create mode 100644 folly/synchronization/Rcu.h create mode 100644 folly/synchronization/detail/ThreadCachedInts.h create mode 100644 folly/synchronization/detail/ThreadCachedLists.h create mode 100644 folly/synchronization/test/RcuTest.cpp diff --git a/folly/synchronization/Rcu-inl.h b/folly/synchronization/Rcu-inl.h new file mode 100644 index 00000000..162a6e08 --- /dev/null +++ b/folly/synchronization/Rcu-inl.h @@ -0,0 +1,175 @@ +/* + * Copyright 2017-present Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +namespace folly { + +template +bool rcu_domain::singleton_{false}; + +template +rcu_domain::rcu_domain(Executor* executor) noexcept + : executor_(executor ? executor : &QueuedImmediateExecutor::instance()) { + // Please use a unique tag for each domain. + CHECK(!singleton_); + singleton_ = true; + + // Register fork handlers. Holding read locks across fork is not + // supported. Using read locks in other atfork handlers is not + // supported. Other atfork handlers launching new child threads + // that use read locks *is* supported. + detail::AtFork::registerHandler( + this, + [this]() { syncMutex_.lock(); }, + [this]() { syncMutex_.unlock(); }, + [this]() { + counters_.resetAfterFork(); + syncMutex_.unlock(); + }); +} + +template +rcu_domain::~rcu_domain() { + detail::AtFork::unregisterHandler(this); +} + +template +rcu_token rcu_domain::lock_shared() { + auto idx = version_.load(std::memory_order_acquire); + idx &= 1; + counters_.increment(idx); + + return idx; +} + +template +void rcu_domain::unlock_shared(rcu_token&& token) { + DCHECK(0 == token.epoch_ || 1 == token.epoch_); + counters_.decrement(token.epoch_); +} + +template +template +void rcu_domain::call(T&& cbin) { + auto node = new list_node; + node->cb_ = [node, cb = std::forward(cbin)]() { + cb(); + delete node; + }; + retire(node); +} + +template +void rcu_domain::retire(list_node* node) noexcept { + q_.push(node); + + // Note that it's likely we hold a read lock here, + // so we can only half_sync(false). half_sync(true) + // or a synchronize() call might block forever. + uint64_t time = std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + if (time > syncTime_.load(std::memory_order_relaxed) + syncTimePeriod_) { + list_head finished; + { + std::lock_guard g(syncMutex_); + syncTime_.store(time, std::memory_order_relaxed); + half_sync(false, finished); + } + // callbacks are called outside of syncMutex_ + finished.forEach( + [&](list_node* item) { executor_->add(std::move(item->cb_)); }); + } +} + +template +void rcu_domain::synchronize() noexcept { + auto curr = version_.load(std::memory_order_acquire); + // Target is two epochs away. + auto target = curr + 2; + while (true) { + // Try to assign ourselves to do the sync work. + // If someone else is already assigned, we can wait for + // the work to be finished by waiting on turn_. + auto work = work_.load(std::memory_order_acquire); + auto tmp = work; + if (work < target && work_.compare_exchange_strong(tmp, target)) { + list_head finished; + { + std::lock_guard g(syncMutex_); + while (version_.load(std::memory_order_acquire) < target) { + half_sync(true, finished); + } + } + // callbacks are called outside of syncMutex_ + finished.forEach( + [&](list_node* node) { executor_->add(std::move(node->cb_)); }); + return; + } else { + if (version_.load(std::memory_order_acquire) >= target) { + return; + } + std::atomic cutoff{100}; + // Wait for someone to finish the work. + turn_.tryWaitForTurn(work, cutoff, false); + } + } +} + +/* + * Not multithread safe, but it could be with proper version + * checking and stronger increment of version. See + * https://github.com/pramalhe/ConcurrencyFreaks/blob/master/papers/gracesharingurcu-2016.pdf + * + * This version, however, can go to sleep if there are outstanding + * readers, and does not spin or need rescheduling, unless blocking = false. + */ +template +void rcu_domain::half_sync(bool blocking, list_head& finished) { + uint64_t curr = version_.load(std::memory_order_acquire); + auto next = curr + 1; + + // Push all work to a queue for moving through two epochs. One + // version is not enough because of late readers of the version_ + // counter in lock_shared. + // + // Note that for a similar reason we can't swap out the q here, + // and instead drain it, so concurrent calls to call() are safe, + // and will wait for the next epoch. + q_.collect(queues_[0]); + + if (blocking) { + counters_.waitForZero(next & 1); + } else { + if (counters_.readFull(next & 1) != 0) { + return; + } + } + + // Run callbacks that have been through two epochs, and swap queues + // for those only through a single epoch. + finished.splice(queues_[1]); + queues_[1].splice(queues_[0]); + + version_.store(next, std::memory_order_release); + // Notify synchronous waiters in synchronize(). + turn_.completeTurn(curr); +} + +} // namespace folly diff --git a/folly/synchronization/Rcu.cpp b/folly/synchronization/Rcu.cpp new file mode 100644 index 00000000..c74c1c20 --- /dev/null +++ b/folly/synchronization/Rcu.cpp @@ -0,0 +1,22 @@ +/* + * Copyright 2017-present Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +namespace folly { + +FOLLY_STATIC_CTOR_PRIORITY_MAX rcu_domain rcu_default_domain_; + +} // namespace folly diff --git a/folly/synchronization/Rcu.h b/folly/synchronization/Rcu.h new file mode 100644 index 00000000..670ec786 --- /dev/null +++ b/folly/synchronization/Rcu.h @@ -0,0 +1,462 @@ +/* + * Copyright 2017-present Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include + +#include +#include +#include +#include +#include + +// Implementation of proposed RCU C++ API +// http://open-std.org/JTC1/SC22/WG21/docs/papers/2017/p0566r3.pdf + +// Overview + +// This file provides a low-overhead mechanism to guarantee ordering +// between operations on shared data. In the simplest usage pattern, +// readers enter a critical section, view some state, and leave the +// critical section, while writers modify shared state and then defer +// some cleanup operations. Proper use of these classes will guarantee +// that a cleanup operation that is deferred during a reader critical +// section will not be executed until after that critical section is +// over. + +// Example + +// As an example, suppose we have some configuration data that gets +// periodically updated. We need to protect ourselves on *every* read +// path, even if updates are only vanishly rare, because we don't know +// if some writer will come along and yank the data out from under us. +// +// Here's how that might look without deferral: +// +// void doSomething(IPAddress host); +// +// folly::SharedMutex sm; +// ConfigData* globalConfigData; +// +// void reader() { +// while (true) { +// sm.lock_shared(); +// IPAddress curManagementServer = globalConfigData->managementServerIP; +// sm.unlock_shared(); +// doSomethingWith(curManagementServer); +// } +// } +// +// void writer() { +// while (true) { +// std::this_thread::sleep_for(std::chrono::seconds(60)); +// ConfigData* oldConfigData = globalConfigData; +// ConfigData* newConfigData = loadConfigDataFromRemoteServer(); +// sm.lock(); +// globalConfigData = newConfigData; +// sm.unlock(); +// delete oldConfigData; +// } +// } +// +// The readers have to lock_shared and unlock_shared every iteration, even +// during the overwhelming majority of the time in which there's no writer +// present. These functions are surprisingly expensive; there's around 30ns of +// overhead per critical section on my machine. +// +// Let's get rid of the locking. The readers and writer may proceed +// concurrently. Here's how this would look: +// +// void doSomething(IPAddress host); +// +// std::atomic globalConfigData; +// +// void reader() { +// while (true) { +// ConfigData* configData = globalConfigData.load(); +// IPAddress curManagementServer = configData->managementServerIP; +// doSomethingWith(curManagementServer); +// } +// } +// +// void writer() { +// while (true) { +// std::this_thread::sleep_for(std::chrono::seconds(60)); +// ConfigData* newConfigData = loadConfigDataFromRemoteServer(); +// globalConfigData.store(newConfigData); +// // We can't delete the old ConfigData; we don't know when the readers +// // will be done with it! I guess we have to leak it... +// } +// } +// +// This works and is fast, but we don't ever reclaim the memory we +// allocated for the copy of the data. We need a way for the writer to +// know that no readers observed the old value of the pointer and are +// still using it. Tweaking this slightly, we want a way for the +// writer to say "I want some operation (deleting the old ConfigData) +// to happen, but only after I know that all readers that started +// before I requested the action have finished.". The classes in this +// namespace allow this. Here's how we'd express this idea: +// +// void doSomething(IPAddress host); +// std::atomic globalConfigData; +// +// +// void reader() { +// while (true) { +// IPAddress curManagementServer; +// { +// // We're about to do some reads we want to protect; if we read a +// // pointer, we need to make sure that if the writer comes along and +// // updates it, the writer's cleanup operation won't happen until we're +// // done accessing the pointed-to data. We get a Guard on that +// // domain; as long as it exists, no function subsequently passed to +// // invokeEventually will execute. +// rcu_reader guard; +// ConfigData* configData = globalConfigData.load(); +// // We created a guard before we read globalConfigData; we know that the +// // pointer will remain valid until the guard is destroyed. +// curManagementServer = configData->managementServerIP; +// // Guard is released here; retired objects may be freed. +// } +// doSomethingWith(curManagementServer); +// } +// } +// +// void writer() { +// +// while (true) { +// std::this_thread::sleep_for(std::chrono::seconds(60)); +// ConfigData* oldConfigData = globalConfigData.load(); +// ConfigData* newConfigData = loadConfigDataFromRemoteServer(); +// globalConfigData.store(newConfigData); +// rcu_retire(oldConfigData); +// // alternatively, in a blocking manner: +// // synchronize_rcu(); +// // delete oldConfigData; +// } +// } +// +// This gets us close to the speed of the second solution, without +// leaking memory. A rcu_reader costs about 4 ns, faster than the +// lock_shared() / unlock_shared() pair in the more traditional +// mutex-based approach from our first attempt, and the writer +// never blocks the readers. + +// Notes + +// This implementation does implement an rcu_domain, and provides a default +// one for use per the standard implementation. +// +// rcu_domain: +// A "universe" of deferred execution. Each rcu_domain has an +// executor on which deferred functions may execute. Readers obtain +// Tokens from an rcu_domain and release them back to it. +// rcu_domains should in general be completely separated; it's never +// correct to pass a token from one domain to another, and +// rcu_reader objects created on one domain do not prevent functions +// deferred on other domains from running. It's intended that most +// callers should only ever use the default, global domain. +// +// Creation of a domain takes a template tag argument, which +// defaults to void. To access different domains, you have to pass a +// different tag. The global domain is preferred for almost all +// purposes, unless a different executor is required. +// +// You should use a custom rcu_domain if you can't avoid sleeping +// during reader critical sections (because you don't want to block +// all other users of the domain while you sleep), or you want to change +// the default executor type. + +// API correctness limitations: +// - Exceptions: +// In short, nothing about this is exception safe. retire functions should +// not throw exceptions in their destructors, move constructors or call +// operators. +// +// Performance limitations: +// - Blocking: +// A blocked reader will block invocation of deferred functions until it +// becomes unblocked. Sleeping while holding a rcu_reader can have bad +// performance consequences. +// +// API questions you might have: +// - Nested critical sections: +// These are fine. The following is explicitly allowed by the standard, up to +// a nesting level of 100: +// rcu_reader reader1; +// doSomeWork(); +// rcu_reader reader2; +// doSomeMoreWork(); +// - Restrictions on retired()ed functions: +// Any operation is safe from within a retired function's +// execution; you can retire additional functions or add a new domain call to +// the domain. +// - rcu_domain destruction: +// Destruction of a domain assumes previous synchronization: all remaining +// call and retire calls are immediately added to the executor. + +// Overheads + +// Deferral latency is as low as is practical: overhead involves running +// several global memory barriers on the machine to ensure all readers are gone. +// +// Currently use of MPMCQueue is the bottleneck for deferred calls, more +// specialized queues could be used if available, since only a single reader +// reads the queue, and can splice all of the items to the executor if possible. +// +// synchronize_rcu() call latency is on the order of 10ms. Multiple +// separate threads can share a synchronized period and should scale. +// +// rcu_retire() is a queue push, and on the order of 150 ns, however, +// the current implementation may synchronize if the retire queue is full, +// resulting in tail latencies of ~10ms. +// +// rcu_reader creation/destruction is ~4ns. By comparison, +// folly::SharedMutex::lock_shared + unlock_shared pair is ~26ns + +// Hazard pointers vs. RCU: +// +// Hazard pointers protect pointers, generally malloc()d pieces of memory, and +// each hazptr only protects one such block. +// +// RCU protects critical sections, *all* memory is protected as long +// as the critical section is active. +// +// For example, this has implications for linked lists: If you wish to +// free an entire linked list, you simply rcu_retire() each node with +// RCU: readers will see either an entirely valid list, or no list at +// all. +// +// Conversely with hazptrs, generally lists are walked with +// hand-over-hand hazptrs. Only one node is protected at a time. So +// anywhere in the middle of the list, hazptrs may read NULL, and throw +// away all current work. Alternatively, reference counting can be used, +// (as if each node was a shared_ptr), so that readers will always see +// *the remaining part* of the list as valid, however parts before its current +// hazptr may be freed. +// +// So roughly: RCU is simple, but an all-or-nothing affair. A single rcu_reader +// can block all reclamation. Hazptrs will reclaim exactly as much as possible, +// at the cost of extra work writing traversal code +// +// Reproduced from +// http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2017/p0461r1.pdf +// +// Reference Counting RCU Hazptr +// +// Unreclaimed objects Bounded Unbounded Bounded +// +// Contention among readers High None None +// +// Traversal forward progress lock-free wait-free lock-free +// +// Reclamation forward progress lock-free blocking wait-free +// +// Traversal speed atomic no-overhead folly's is +// no-overhead +// +// Reference acquisition unconditional unconditional conditional +// +// Automatic reclamation yes no no +// +// Purpose of domains N/A isolate slow configeration +// readers + +namespace folly { + +struct RcuTag; + +template +class rcu_domain; + +// Opaque token used to match up lock_shared() and unlock_shared() +// pairs. +class rcu_token { + public: + rcu_token(uint64_t epoch) : epoch_(epoch) {} + rcu_token() {} + ~rcu_token() = default; + + rcu_token(const rcu_token&) = delete; + rcu_token(rcu_token&& other) = default; + rcu_token& operator=(const rcu_token& other) = delete; + rcu_token& operator=(rcu_token&& other) = default; + + private: + template + friend class rcu_domain; + uint64_t epoch_; +}; + +// For most usages, rcu_domain is unnecessary, and you can use +// rcu_reader and rcu_retire/synchronize_rcu directly. +template +class rcu_domain { + using list_head = typename detail::ThreadCachedLists::ListHead; + using list_node = typename detail::ThreadCachedLists::Node; + + public: + /* + * If an executor is passed, it is used to run calls and delete + * retired objects. + */ + rcu_domain(Executor* executor = nullptr) noexcept; + + rcu_domain(const rcu_domain&) = delete; + rcu_domain(rcu_domain&&) = delete; + rcu_domain& operator=(const rcu_domain&) = delete; + rcu_domain& operator=(rcu_domain&&) = delete; + ~rcu_domain(); + + // Reader locks: Prevent any calls from occuring, retired memory + // from being freed, and synchronize() calls from completing until + // all preceding lock_shared() sections are finished. + + // Note: can potentially allocate on thread first use. + FOLLY_ALWAYS_INLINE rcu_token lock_shared(); + FOLLY_ALWAYS_INLINE void unlock_shared(rcu_token&&); + + // Call a function after concurrent critical sections have finished. + // Does not block unless the queue is full, then may block to wait + // for scheduler thread, but generally does not wait for full + // synchronization. + template + void call(T&& cbin); + void retire(list_node* node) noexcept; + + // Ensure concurrent critical sections have finished. + // Always waits for full synchronization. + // read lock *must not* be held. + void synchronize() noexcept; + + private: + detail::ThreadCachedInts counters_; + // Global epoch. + std::atomic version_{0}; + // Future epochs being driven by threads in synchronize + std::atomic work_{0}; + // Matches version_, for waking up threads in synchronize that are + // sharing an epoch. + detail::TurnSequencer turn_; + // Only one thread can drive half_sync. + std::mutex syncMutex_; + // Currently half_sync takes ~16ms due to heavy barriers. + // Ensure that if used in a single thread, max GC time + // is limited to 1% of total CPU time. + static constexpr uint64_t syncTimePeriod_{1600 * 2 /* full sync is 2x */}; + std::atomic syncTime_{0}; + // call()s waiting to move through two epochs. + detail::ThreadCachedLists q_; + // Executor callbacks will eventually be run on. + Executor* executor_{nullptr}; + static bool singleton_; // Ensure uniqueness per-tag. + + // Queues for callbacks waiting to go through two epochs. + list_head queues_[2]{}; + + // Move queues through one epoch (half of a full synchronize()). + // Will block waiting for readers to exit if blocking is true. + // blocking must *not* be true if the current thread is locked, + // or will deadlock. + // + // returns a list of callbacks ready to run in cbs. + void half_sync(bool blocking, list_head& cbs); +}; + +extern rcu_domain rcu_default_domain_; + +inline rcu_domain* rcu_default_domain() { + return &rcu_default_domain_; +} + +// Main reader guard class. +class rcu_reader { + public: + FOLLY_ALWAYS_INLINE rcu_reader() noexcept + : epoch_(rcu_default_domain()->lock_shared()) {} + rcu_reader(std::defer_lock_t) noexcept {} + rcu_reader(const rcu_reader&) = delete; + rcu_reader(rcu_reader&& other) noexcept : epoch_(std::move(other.epoch_)) {} + rcu_reader& operator=(const rcu_reader&) = delete; + rcu_reader& operator=(rcu_reader&& other) noexcept { + if (epoch_.has_value()) { + rcu_default_domain()->unlock_shared(std::move(epoch_.value())); + } + epoch_ = std::move(other.epoch_); + return *this; + } + + FOLLY_ALWAYS_INLINE ~rcu_reader() noexcept { + if (epoch_.has_value()) { + rcu_default_domain()->unlock_shared(std::move(epoch_.value())); + } + } + + void swap(rcu_reader& other) noexcept { + std::swap(epoch_, other.epoch_); + } + + FOLLY_ALWAYS_INLINE void lock() noexcept { + DCHECK(!epoch_.has_value()); + epoch_ = rcu_default_domain()->lock_shared(); + } + + FOLLY_ALWAYS_INLINE void unlock() noexcept { + DCHECK(epoch_.has_value()); + rcu_default_domain()->unlock_shared(std::move(epoch_.value())); + } + + private: + Optional epoch_; +}; + +inline void swap(rcu_reader& a, rcu_reader& b) noexcept { + a.swap(b); +} + +inline void synchronize_rcu() noexcept { + rcu_default_domain()->synchronize(); +} + +inline void rcu_barrier() noexcept { + rcu_default_domain()->synchronize(); +} + +// Free-function retire. Always allocates. +template > +void rcu_retire(T* p, D d = {}) { + rcu_default_domain()->call([p, del = std::move(d)]() { del(p); }); +} + +// Base class for rcu objects. retire() will use preallocated storage +// from rcu_obj_base, vs. rcu_retire() which always allocates. +template > +class rcu_obj_base : detail::ThreadCachedListsBase::Node { + public: + void retire(D d = {}) { + // This implementation assumes folly::Function has enough + // inline storage for D, otherwise, it allocates. + this->cb_ = [this, d = std::move(d)]() { d(static_cast(this)); }; + rcu_default_domain()->retire(this); + } +}; + +} // namespace folly + +#include diff --git a/folly/synchronization/detail/ThreadCachedInts.h b/folly/synchronization/detail/ThreadCachedInts.h new file mode 100644 index 00000000..2b954c1f --- /dev/null +++ b/folly/synchronization/detail/ThreadCachedInts.h @@ -0,0 +1,173 @@ +/* + * Copyright 2017-present Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +// This is unlike folly::ThreadCachedInt in that the full value +// is never rounded up globally and cached, it only supports readFull. +// +// folly/experimental/TLRefCount is similar, but does not support a +// waitForZero, and is not reset-able. +// +// Note that the RCU implementation is completely abstracted from the +// counter implementation, a rseq implementation can be dropped in +// if the kernel supports it. + +namespace folly { + +namespace detail { + +template +class ThreadCachedInts { + // These are only accessed under the ThreadLocal lock. + int64_t orphan_inc_[2]{0, 0}; + int64_t orphan_dec_[2]{0, 0}; + folly::detail::Futex<> waiting_; + + class Integer { + public: + ThreadCachedInts* ints_; + constexpr Integer(ThreadCachedInts* ints) noexcept + : ints_(ints), inc_{}, dec_{} {} + std::atomic inc_[2]; + std::atomic dec_[2]; + ~Integer() noexcept { + ints_->orphan_inc_[0] += inc_[0].load(std::memory_order_relaxed); + ints_->orphan_inc_[1] += inc_[1].load(std::memory_order_relaxed); + ints_->orphan_dec_[0] += dec_[0].load(std::memory_order_relaxed); + ints_->orphan_dec_[1] += dec_[1].load(std::memory_order_relaxed); + ints_->waiting_.store(0, std::memory_order_release); + ints_->waiting_.futexWake(); + } + }; + folly::ThreadLocalPtr cs_; + + // Cache the int pointer in a threadlocal. + static thread_local Integer* int_cache_; + + void init() { + auto ret = new Integer(this); + cs_.reset(ret); + int_cache_ = ret; + } + + public: + FOLLY_ALWAYS_INLINE void increment(uint8_t epoch) { + if (!int_cache_) { + init(); + } + + auto& c = int_cache_->inc_[epoch]; + auto val = c.load(std::memory_order_relaxed); + c.store(val + 1, std::memory_order_relaxed); + + folly::asymmetricLightBarrier(); // A + } + + FOLLY_ALWAYS_INLINE void decrement(uint8_t epoch) { + folly::asymmetricLightBarrier(); // B + if (!int_cache_) { + init(); + } + + auto& c = int_cache_->dec_[epoch]; + auto val = c.load(std::memory_order_relaxed); + c.store(val + 1, std::memory_order_relaxed); + + folly::asymmetricLightBarrier(); // C + if (waiting_.load(std::memory_order_acquire)) { + waiting_.store(0, std::memory_order_release); + waiting_.futexWake(); + } + } + + int64_t readFull(uint8_t epoch) { + int64_t full = 0; + + // Matches A - ensure all threads have seen new value of version, + // *and* that we see current values of counters in readFull() + // + // Note that in lock_shared if a reader is currently between the + // version load and counter increment, they may update the wrong + // epoch. However, this is ok - they started concurrently *after* + // any callbacks that will run, and therefore it is safe to run + // the callbacks. + folly::asymmetricHeavyBarrier(); + for (auto& i : cs_.accessAllThreads()) { + full -= i.dec_[epoch].load(std::memory_order_relaxed); + } + + // Matches B - ensure that all increments are seen if decrements + // are seen. This is necessary because increment and decrement + // are allowed to happen on different threads. + folly::asymmetricHeavyBarrier(); + + auto accessor = cs_.accessAllThreads(); + for (auto& i : accessor) { + full += i.inc_[epoch].load(std::memory_order_relaxed); + } + + // orphan is read behind accessAllThreads lock + auto res = full + orphan_inc_[epoch] - orphan_dec_[epoch]; + return res; + } + + void waitForZero(uint8_t phase) { + // Try reading before futex sleeping. + if (readFull(phase) == 0) { + return; + } + + while (true) { + waiting_.store(1, std::memory_order_release); + // Matches C. Ensure either decrement sees waiting_, + // or we see their decrement and can safely sleep. + folly::asymmetricHeavyBarrier(); + if (readFull(phase) == 0) { + break; + } + waiting_.futexWait(1); + } + waiting_.store(0, std::memory_order_relaxed); + } + + // We are guaranteed to be called while StaticMeta lock is still + // held because of ordering in AtForkList. We can therefore safely + // touch orphan_ and clear out all counts. + void resetAfterFork() { + if (int_cache_) { + int_cache_->dec_[0].store(0, std::memory_order_relaxed); + int_cache_->dec_[1].store(0, std::memory_order_relaxed); + int_cache_->inc_[0].store(0, std::memory_order_relaxed); + int_cache_->inc_[1].store(0, std::memory_order_relaxed); + } + orphan_inc_[0] = 0; + orphan_inc_[1] = 0; + orphan_dec_[0] = 0; + orphan_dec_[1] = 0; + } +}; + +template +thread_local typename detail::ThreadCachedInts::Integer* + detail::ThreadCachedInts::int_cache_{nullptr}; + +} // namespace detail +} // namespace folly diff --git a/folly/synchronization/detail/ThreadCachedLists.h b/folly/synchronization/detail/ThreadCachedLists.h new file mode 100644 index 00000000..6161d4e1 --- /dev/null +++ b/folly/synchronization/detail/ThreadCachedLists.h @@ -0,0 +1,189 @@ +/* + * Copyright 2017-present Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include +#include +#include + +namespace folly { + +namespace detail { + +// This is a thread-local cached, multi-producer single-consumer +// queue, similar to a concurrent version of std::list. +// +class ThreadCachedListsBase { + public: + struct Node { + folly::Function cb_; + Node* next_{nullptr}; + }; +}; + +template +class ThreadCachedLists : public ThreadCachedListsBase { + public: + struct AtomicListHead { + std::atomic tail_{nullptr}; + std::atomic head_{nullptr}; + }; + + // Non-concurrent list, similar to std::list. + struct ListHead { + Node* head_{nullptr}; + Node* tail_{nullptr}; + + // Run func on each list node. + template + void forEach(Func func) { + auto node = tail_; + while (node != nullptr) { + auto next = node->next_; + func(node); + node = next; + } + } + + // Splice other in to this list. + // Afterwards, other is a valid empty listhead. + void splice(ListHead& other); + + void splice(AtomicListHead& other); + }; + + // Push a node on a thread-local list. Returns true if local list + // was pushed global. + void push(Node* node); + + // Collect all thread local lists to a single local list. + // This function is threadsafe with concurrent push()es, + // but only a single thread may call collect() at a time. + void collect(ListHead& list); + + private: + // Push list to the global list. + void pushGlobal(ListHead& list); + + ListHead ghead_; + + struct TLHead : public AtomicListHead { + ThreadCachedLists* parent_; + + public: + TLHead(ThreadCachedLists* parent) : parent_(parent) {} + + ~TLHead() { + parent_->ghead_.splice(*this); + } + }; + + folly::ThreadLocalPtr lhead_; +}; + +// push() and splice() are optimistic w.r.t setting the list head: The +// first pusher cas's the list head, which functions as a lock until +// tail != null. The first pusher then sets tail_ = head_. +// +// splice() does the opposite: steals the tail_ via exchange, then +// unlocks the list again by setting head_ to null. +template +void ThreadCachedLists::push(Node* node) { + DCHECK(node->next_ == nullptr); + static thread_local TLHead* cache_{nullptr}; + + if (!cache_) { + auto l = lhead_.get(); + if (!l) { + lhead_.reset(new TLHead(this)); + l = lhead_.get(); + DCHECK(l); + } + cache_ = l; + } + + while (true) { + auto head = cache_->head_.load(std::memory_order_relaxed); + if (!head) { + node->next_ = nullptr; + if (cache_->head_.compare_exchange_weak(head, node)) { + cache_->tail_.store(node); + break; + } + } else { + auto tail = cache_->tail_.load(std::memory_order_relaxed); + if (tail) { + node->next_ = tail; + if (cache_->tail_.compare_exchange_weak(node->next_, node)) { + break; + } + } + } + } +} + +template +void ThreadCachedLists::collect(ListHead& list) { + auto acc = lhead_.accessAllThreads(); + + for (auto& thr : acc) { + list.splice(thr); + } + + list.splice(ghead_); +} + +template +void ThreadCachedLists::ListHead::splice(ListHead& other) { + if (other.head_ != nullptr) { + DCHECK(other.tail_ != nullptr); + } else { + DCHECK(other.tail_ == nullptr); + return; + } + + if (head_) { + DCHECK(tail_ != nullptr); + DCHECK(head_->next_ == nullptr); + head_->next_ = other.tail_; + head_ = other.head_; + } else { + DCHECK(head_ == nullptr); + head_ = other.head_; + tail_ = other.tail_; + } + + other.head_ = nullptr; + other.tail_ = nullptr; +} + +template +void ThreadCachedLists::ListHead::splice(AtomicListHead& list) { + ListHead local; + + auto tail = list.tail_.load(); + if (tail) { + local.tail_ = list.tail_.exchange(nullptr); + local.head_ = list.head_.exchange(nullptr); + splice(local); + } +} + +} // namespace detail +} // namespace folly diff --git a/folly/synchronization/test/RcuTest.cpp b/folly/synchronization/test/RcuTest.cpp new file mode 100644 index 00000000..5df5b0f3 --- /dev/null +++ b/folly/synchronization/test/RcuTest.cpp @@ -0,0 +1,276 @@ +/* + * Copyright 2017-present Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +#include +#include + +#include + +#include +#include +#include +#include +#include + +using namespace folly; + +DEFINE_int64(iters, 100000, "Number of iterations"); +DEFINE_int64(threads, 32, "Number of threads"); + +TEST(RcuTest, Basic) { + auto foo = new int(2); + rcu_retire(foo); +} + +class des { + bool* d_; + + public: + des(bool* d) : d_(d) {} + ~des() { + *d_ = true; + } +}; + +TEST(RcuTest, Guard) { + bool del = false; + auto foo = new des(&del); + { rcu_reader g; } + rcu_retire(foo); + synchronize_rcu(); + EXPECT_TRUE(del); +} + +TEST(RcuTest, Perf) { + long i = FLAGS_iters; + auto start = std::chrono::steady_clock::now(); + while (i-- > 0) { + rcu_reader g; + } + auto diff = std::chrono::steady_clock::now() - start; + printf( + "Total time %li ns \n", + std::chrono::duration_cast(diff).count() / + FLAGS_iters); +} + +TEST(RcuTest, ResetPerf) { + long i = FLAGS_iters; + auto start = std::chrono::steady_clock::now(); + while (i-- > 0) { + rcu_retire(nullptr, [](int*) {}); + } + auto diff = std::chrono::steady_clock::now() - start; + printf( + "Total time %li ns \n", + std::chrono::duration_cast(diff).count() / + FLAGS_iters); +} + +TEST(RcuTest, SlowReader) { + std::thread t; + { + rcu_reader g; + + t = std::thread([&]() { synchronize_rcu(); }); + usleep(100); // Wait for synchronize to start + } + t.join(); +} + +rcu_reader tryretire(des* obj) { + rcu_reader g; + rcu_retire(obj); + return g; +} + +TEST(RcuTest, CopyGuard) { + bool del = false; + auto foo = new des(&del); + { + auto res = tryretire(foo); + EXPECT_FALSE(del); + } + rcu_barrier(); + EXPECT_TRUE(del); +} + +TEST(RcuTest, Stress) { + std::vector threads; + constexpr uint32_t sz = 1000; + std::atomic ints[sz]; + for (uint i = 0; i < sz; i++) { + ints[i].store(new int(0)); + } + for (int th = 0; th < FLAGS_threads; th++) { + threads.push_back(std::thread([&]() { + for (int i = 0; i < FLAGS_iters / 100; i++) { + rcu_reader g; + int sum = 0; + int* ptrs[sz]; + for (uint j = 0; j < sz; j++) { + ptrs[j] = ints[j].load(std::memory_order_acquire); + } + for (uint j = 0; j < sz; j++) { + sum += *ptrs[j]; + } + EXPECT_EQ(sum, 0); + } + })); + } + std::atomic done{false}; + std::thread updater([&]() { + while (!done.load()) { + auto newint = new int(0); + auto oldint = ints[folly::Random::rand32() % sz].exchange(newint); + rcu_retire(oldint, [](int* obj) { + *obj = folly::Random::rand32(); + delete obj; + }); + } + }); + for (auto& t : threads) { + t.join(); + } + done = true; + updater.join(); +} + +TEST(RcuTest, Synchronize) { + std::vector threads; + for (int th = 0; th < FLAGS_threads; th++) { + threads.push_back(std::thread([&]() { + for (int i = 0; i < 10; i++) { + synchronize_rcu(); + } + })); + } + for (auto& t : threads) { + t.join(); + } +} + +TEST(RcuTest, NewDomainTest) { + struct UniqueTag; + rcu_domain newdomain(nullptr); + synchronize_rcu(); +} + +TEST(RcuTest, MovableReader) { + { + rcu_reader g; + rcu_reader f(std::move(g)); + } + synchronize_rcu(); + { + rcu_reader g(std::defer_lock); + rcu_reader f; + g = std::move(f); + } + synchronize_rcu(); +} + +TEST(RcuTest, SynchronizeInCall) { + rcu_default_domain()->call([]() { synchronize_rcu(); }); + synchronize_rcu(); +} + +TEST(RcuTest, MoveReaderBetweenThreads) { + rcu_reader g; + std::thread t([f = std::move(g)] {}); + t.join(); + synchronize_rcu(); +} + +TEST(RcuTest, ForkTest) { + folly::Baton<> b; + rcu_token epoch; + std::thread t([&]() { + epoch = rcu_default_domain()->lock_shared(); + b.post(); + }); + t.detach(); + b.wait(); + auto pid = fork(); + if (pid) { + // parent + rcu_default_domain()->unlock_shared(std::move(epoch)); + synchronize_rcu(); + int status; + auto pid2 = wait(&status); + EXPECT_EQ(status, 0); + EXPECT_EQ(pid, pid2); + } else { + // child + synchronize_rcu(); + exit(0); // Do not print gtest results + } +} + +TEST(RcuTest, CoreLocalList) { + struct TTag; + folly::detail::ThreadCachedLists lists; + int numthreads = 32; + std::vector threads; + std::atomic done{0}; + for (int tr = 0; tr < numthreads; tr++) { + threads.push_back(std::thread([&]() { + for (int i = 0; i < FLAGS_iters; i++) { + auto node = new folly::detail::ThreadCachedListsBase::Node; + lists.push(node); + } + done++; + })); + } + while (done.load() != numthreads) { + folly::detail::ThreadCachedLists::ListHead list{}; + lists.collect(list); + list.forEach([](folly::detail::ThreadCachedLists::Node* node) { + delete node; + }); + } + for (auto& thread : threads) { + thread.join(); + } +} + +TEST(RcuTest, ThreadDeath) { + bool del = false; + std::thread t([&] { + auto foo = new des(&del); + rcu_retire(foo); + }); + t.join(); + synchronize_rcu(); + EXPECT_TRUE(del); +} + +TEST(RcuTest, RcuObjBase) { + bool retired = false; + struct base_test : rcu_obj_base { + bool* ret_; + base_test(bool* ret) : ret_(ret) {} + ~base_test() { + (*ret_) = true; + } + }; + + auto foo = new base_test(&retired); + foo->retire(); + synchronize_rcu(); + EXPECT_TRUE(retired); +} -- 2.34.1