RCU
[folly.git] / folly / synchronization / detail / ThreadCachedInts.h
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/Function.h>
20 #include <folly/ThreadLocal.h>
21 #include <folly/synchronization/AsymmetricMemoryBarrier.h>
22
23 // This is unlike folly::ThreadCachedInt in that the full value
24 // is never rounded up globally and cached, it only supports readFull.
25 //
26 // folly/experimental/TLRefCount is similar, but does not support a
27 // waitForZero, and is not reset-able.
28 //
29 // Note that the RCU implementation is completely abstracted from the
30 // counter implementation, a rseq implementation can be dropped in
31 // if the kernel supports it.
32
33 namespace folly {
34
35 namespace detail {
36
37 template <typename Tag>
38 class ThreadCachedInts {
39   // These are only accessed under the ThreadLocal lock.
40   int64_t orphan_inc_[2]{0, 0};
41   int64_t orphan_dec_[2]{0, 0};
42   folly::detail::Futex<> waiting_;
43
44   class Integer {
45    public:
46     ThreadCachedInts* ints_;
47     constexpr Integer(ThreadCachedInts* ints) noexcept
48         : ints_(ints), inc_{}, dec_{} {}
49     std::atomic<int64_t> inc_[2];
50     std::atomic<int64_t> dec_[2];
51     ~Integer() noexcept {
52       ints_->orphan_inc_[0] += inc_[0].load(std::memory_order_relaxed);
53       ints_->orphan_inc_[1] += inc_[1].load(std::memory_order_relaxed);
54       ints_->orphan_dec_[0] += dec_[0].load(std::memory_order_relaxed);
55       ints_->orphan_dec_[1] += dec_[1].load(std::memory_order_relaxed);
56       ints_->waiting_.store(0, std::memory_order_release);
57       ints_->waiting_.futexWake();
58     }
59   };
60   folly::ThreadLocalPtr<Integer, Tag> cs_;
61
62   // Cache the int pointer in a threadlocal.
63   static thread_local Integer* int_cache_;
64
65   void init() {
66     auto ret = new Integer(this);
67     cs_.reset(ret);
68     int_cache_ = ret;
69   }
70
71  public:
72   FOLLY_ALWAYS_INLINE void increment(uint8_t epoch) {
73     if (!int_cache_) {
74       init();
75     }
76
77     auto& c = int_cache_->inc_[epoch];
78     auto val = c.load(std::memory_order_relaxed);
79     c.store(val + 1, std::memory_order_relaxed);
80
81     folly::asymmetricLightBarrier(); // A
82   }
83
84   FOLLY_ALWAYS_INLINE void decrement(uint8_t epoch) {
85     folly::asymmetricLightBarrier(); // B
86     if (!int_cache_) {
87       init();
88     }
89
90     auto& c = int_cache_->dec_[epoch];
91     auto val = c.load(std::memory_order_relaxed);
92     c.store(val + 1, std::memory_order_relaxed);
93
94     folly::asymmetricLightBarrier(); // C
95     if (waiting_.load(std::memory_order_acquire)) {
96       waiting_.store(0, std::memory_order_release);
97       waiting_.futexWake();
98     }
99   }
100
101   int64_t readFull(uint8_t epoch) {
102     int64_t full = 0;
103
104     // Matches A - ensure all threads have seen new value of version,
105     // *and* that we see current values of counters in readFull()
106     //
107     // Note that in lock_shared if a reader is currently between the
108     // version load and counter increment, they may update the wrong
109     // epoch.  However, this is ok - they started concurrently *after*
110     // any callbacks that will run, and therefore it is safe to run
111     // the callbacks.
112     folly::asymmetricHeavyBarrier();
113     for (auto& i : cs_.accessAllThreads()) {
114       full -= i.dec_[epoch].load(std::memory_order_relaxed);
115     }
116
117     // Matches B - ensure that all increments are seen if decrements
118     // are seen. This is necessary because increment and decrement
119     // are allowed to happen on different threads.
120     folly::asymmetricHeavyBarrier();
121
122     auto accessor = cs_.accessAllThreads();
123     for (auto& i : accessor) {
124       full += i.inc_[epoch].load(std::memory_order_relaxed);
125     }
126
127     // orphan is read behind accessAllThreads lock
128     auto res = full + orphan_inc_[epoch] - orphan_dec_[epoch];
129     return res;
130   }
131
132   void waitForZero(uint8_t phase) {
133     // Try reading before futex sleeping.
134     if (readFull(phase) == 0) {
135       return;
136     }
137
138     while (true) {
139       waiting_.store(1, std::memory_order_release);
140       // Matches C.  Ensure either decrement sees waiting_,
141       // or we see their decrement and can safely sleep.
142       folly::asymmetricHeavyBarrier();
143       if (readFull(phase) == 0) {
144         break;
145       }
146       waiting_.futexWait(1);
147     }
148     waiting_.store(0, std::memory_order_relaxed);
149   }
150
151   // We are guaranteed to be called while StaticMeta lock is still
152   // held because of ordering in AtForkList.  We can therefore safely
153   // touch orphan_ and clear out all counts.
154   void resetAfterFork() {
155     if (int_cache_) {
156       int_cache_->dec_[0].store(0, std::memory_order_relaxed);
157       int_cache_->dec_[1].store(0, std::memory_order_relaxed);
158       int_cache_->inc_[0].store(0, std::memory_order_relaxed);
159       int_cache_->inc_[1].store(0, std::memory_order_relaxed);
160     }
161     orphan_inc_[0] = 0;
162     orphan_inc_[1] = 0;
163     orphan_dec_[0] = 0;
164     orphan_dec_[1] = 0;
165   }
166 };
167
168 template <typename Tag>
169 thread_local typename detail::ThreadCachedInts<Tag>::Integer*
170     detail::ThreadCachedInts<Tag>::int_cache_{nullptr};
171
172 } // namespace detail
173 } // namespace folly