2 * Copyright 2017-present Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/Function.h>
20 #include <folly/ThreadLocal.h>
21 #include <folly/synchronization/AsymmetricMemoryBarrier.h>
23 // This is unlike folly::ThreadCachedInt in that the full value
24 // is never rounded up globally and cached, it only supports readFull.
26 // folly/experimental/TLRefCount is similar, but does not support a
27 // waitForZero, and is not reset-able.
29 // Note that the RCU implementation is completely abstracted from the
30 // counter implementation, a rseq implementation can be dropped in
31 // if the kernel supports it.
37 template <typename Tag>
38 class ThreadCachedInts {
39 std::atomic<int64_t> orphan_inc_[2];
40 std::atomic<int64_t> orphan_dec_[2];
41 folly::detail::Futex<> waiting_{0};
45 ThreadCachedInts* ints_;
46 constexpr Integer(ThreadCachedInts* ints) noexcept
47 : ints_(ints), inc_{}, dec_{} {}
48 std::atomic<int64_t> inc_[2];
49 std::atomic<int64_t> dec_[2];
51 // Increment counts must be set before decrement counts
52 ints_->orphan_inc_[0].fetch_add(
53 inc_[0].load(std::memory_order_relaxed), std::memory_order_relaxed);
54 ints_->orphan_inc_[1].fetch_add(
55 inc_[1].load(std::memory_order_relaxed), std::memory_order_relaxed);
56 folly::asymmetricLightBarrier(); // B
57 ints_->orphan_dec_[0].fetch_add(
58 dec_[0].load(std::memory_order_relaxed), std::memory_order_relaxed);
59 ints_->orphan_dec_[1].fetch_add(
60 dec_[1].load(std::memory_order_relaxed), std::memory_order_relaxed);
61 ints_->waiting_.store(0, std::memory_order_release);
62 ints_->waiting_.futexWake();
65 folly::ThreadLocalPtr<Integer, Tag> cs_;
67 // Cache the int pointer in a threadlocal.
68 static thread_local Integer* int_cache_;
71 auto ret = new Integer(this);
77 FOLLY_ALWAYS_INLINE void increment(uint8_t epoch) {
82 auto& c = int_cache_->inc_[epoch];
83 auto val = c.load(std::memory_order_relaxed);
84 c.store(val + 1, std::memory_order_relaxed);
86 folly::asymmetricLightBarrier(); // A
89 FOLLY_ALWAYS_INLINE void decrement(uint8_t epoch) {
90 folly::asymmetricLightBarrier(); // B
95 auto& c = int_cache_->dec_[epoch];
96 auto val = c.load(std::memory_order_relaxed);
97 c.store(val + 1, std::memory_order_relaxed);
99 folly::asymmetricLightBarrier(); // C
100 if (waiting_.load(std::memory_order_acquire)) {
101 waiting_.store(0, std::memory_order_release);
102 waiting_.futexWake();
106 int64_t readFull(uint8_t epoch) {
107 int64_t full = -orphan_dec_[epoch].load(std::memory_order_relaxed);
109 // Matches A - ensure all threads have seen new value of version,
110 // *and* that we see current values of counters in readFull()
112 // Note that in lock_shared if a reader is currently between the
113 // version load and counter increment, they may update the wrong
114 // epoch. However, this is ok - they started concurrently *after*
115 // any callbacks that will run, and therefore it is safe to run
117 folly::asymmetricHeavyBarrier();
118 for (auto& i : cs_.accessAllThreads()) {
119 full -= i.dec_[epoch].load(std::memory_order_relaxed);
122 // Matches B - ensure that all increments are seen if decrements
123 // are seen. This is necessary because increment and decrement
124 // are allowed to happen on different threads.
125 folly::asymmetricHeavyBarrier();
127 auto accessor = cs_.accessAllThreads();
128 for (auto& i : accessor) {
129 full += i.inc_[epoch].load(std::memory_order_relaxed);
132 // orphan is read behind accessAllThreads lock
133 return full + orphan_inc_[epoch].load(std::memory_order_relaxed);
136 void waitForZero(uint8_t phase) {
137 // Try reading before futex sleeping.
138 if (readFull(phase) == 0) {
143 waiting_.store(1, std::memory_order_release);
144 // Matches C. Ensure either decrement sees waiting_,
145 // or we see their decrement and can safely sleep.
146 folly::asymmetricHeavyBarrier();
147 if (readFull(phase) == 0) {
150 waiting_.futexWait(1);
152 waiting_.store(0, std::memory_order_relaxed);
155 // We are guaranteed to be called while StaticMeta lock is still
156 // held because of ordering in AtForkList. We can therefore safely
157 // touch orphan_ and clear out all counts.
158 void resetAfterFork() {
160 int_cache_->dec_[0].store(0, std::memory_order_relaxed);
161 int_cache_->dec_[1].store(0, std::memory_order_relaxed);
162 int_cache_->inc_[0].store(0, std::memory_order_relaxed);
163 int_cache_->inc_[1].store(0, std::memory_order_relaxed);
165 orphan_inc_[0].store(0, std::memory_order_relaxed);
166 orphan_inc_[1].store(0, std::memory_order_relaxed);
167 orphan_dec_[0].store(0, std::memory_order_relaxed);
168 orphan_dec_[1].store(0, std::memory_order_relaxed);
172 template <typename Tag>
173 thread_local typename detail::ThreadCachedInts<Tag>::Integer*
174 detail::ThreadCachedInts<Tag>::int_cache_{nullptr};
176 } // namespace detail