Adds writer test case for RCU
[folly.git] / folly / synchronization / detail / ThreadCachedInts.h
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/Function.h>
20 #include <folly/ThreadLocal.h>
21 #include <folly/synchronization/AsymmetricMemoryBarrier.h>
22
23 // This is unlike folly::ThreadCachedInt in that the full value
24 // is never rounded up globally and cached, it only supports readFull.
25 //
26 // folly/experimental/TLRefCount is similar, but does not support a
27 // waitForZero, and is not reset-able.
28 //
29 // Note that the RCU implementation is completely abstracted from the
30 // counter implementation, a rseq implementation can be dropped in
31 // if the kernel supports it.
32
33 namespace folly {
34
35 namespace detail {
36
37 template <typename Tag>
38 class ThreadCachedInts {
39   std::atomic<int64_t> orphan_inc_[2];
40   std::atomic<int64_t> orphan_dec_[2];
41   folly::detail::Futex<> waiting_{0};
42
43   class Integer {
44    public:
45     ThreadCachedInts* ints_;
46     constexpr Integer(ThreadCachedInts* ints) noexcept
47         : ints_(ints), inc_{}, dec_{} {}
48     std::atomic<int64_t> inc_[2];
49     std::atomic<int64_t> dec_[2];
50     ~Integer() noexcept {
51       // Increment counts must be set before decrement counts
52       ints_->orphan_inc_[0].fetch_add(
53           inc_[0].load(std::memory_order_relaxed), std::memory_order_relaxed);
54       ints_->orphan_inc_[1].fetch_add(
55           inc_[1].load(std::memory_order_relaxed), std::memory_order_relaxed);
56       folly::asymmetricLightBarrier(); // B
57       ints_->orphan_dec_[0].fetch_add(
58           dec_[0].load(std::memory_order_relaxed), std::memory_order_relaxed);
59       ints_->orphan_dec_[1].fetch_add(
60           dec_[1].load(std::memory_order_relaxed), std::memory_order_relaxed);
61       ints_->waiting_.store(0, std::memory_order_release);
62       ints_->waiting_.futexWake();
63     }
64   };
65   folly::ThreadLocalPtr<Integer, Tag> cs_;
66
67   // Cache the int pointer in a threadlocal.
68   static thread_local Integer* int_cache_;
69
70   void init() {
71     auto ret = new Integer(this);
72     cs_.reset(ret);
73     int_cache_ = ret;
74   }
75
76  public:
77   FOLLY_ALWAYS_INLINE void increment(uint8_t epoch) {
78     if (!int_cache_) {
79       init();
80     }
81
82     auto& c = int_cache_->inc_[epoch];
83     auto val = c.load(std::memory_order_relaxed);
84     c.store(val + 1, std::memory_order_relaxed);
85
86     folly::asymmetricLightBarrier(); // A
87   }
88
89   FOLLY_ALWAYS_INLINE void decrement(uint8_t epoch) {
90     folly::asymmetricLightBarrier(); // B
91     if (!int_cache_) {
92       init();
93     }
94
95     auto& c = int_cache_->dec_[epoch];
96     auto val = c.load(std::memory_order_relaxed);
97     c.store(val + 1, std::memory_order_relaxed);
98
99     folly::asymmetricLightBarrier(); // C
100     if (waiting_.load(std::memory_order_acquire)) {
101       waiting_.store(0, std::memory_order_release);
102       waiting_.futexWake();
103     }
104   }
105
106   int64_t readFull(uint8_t epoch) {
107     int64_t full = -orphan_dec_[epoch].load(std::memory_order_relaxed);
108
109     // Matches A - ensure all threads have seen new value of version,
110     // *and* that we see current values of counters in readFull()
111     //
112     // Note that in lock_shared if a reader is currently between the
113     // version load and counter increment, they may update the wrong
114     // epoch.  However, this is ok - they started concurrently *after*
115     // any callbacks that will run, and therefore it is safe to run
116     // the callbacks.
117     folly::asymmetricHeavyBarrier();
118     for (auto& i : cs_.accessAllThreads()) {
119       full -= i.dec_[epoch].load(std::memory_order_relaxed);
120     }
121
122     // Matches B - ensure that all increments are seen if decrements
123     // are seen. This is necessary because increment and decrement
124     // are allowed to happen on different threads.
125     folly::asymmetricHeavyBarrier();
126
127     auto accessor = cs_.accessAllThreads();
128     for (auto& i : accessor) {
129       full += i.inc_[epoch].load(std::memory_order_relaxed);
130     }
131
132     // orphan is read behind accessAllThreads lock
133     return full + orphan_inc_[epoch].load(std::memory_order_relaxed);
134   }
135
136   void waitForZero(uint8_t phase) {
137     // Try reading before futex sleeping.
138     if (readFull(phase) == 0) {
139       return;
140     }
141
142     while (true) {
143       waiting_.store(1, std::memory_order_release);
144       // Matches C.  Ensure either decrement sees waiting_,
145       // or we see their decrement and can safely sleep.
146       folly::asymmetricHeavyBarrier();
147       if (readFull(phase) == 0) {
148         break;
149       }
150       waiting_.futexWait(1);
151     }
152     waiting_.store(0, std::memory_order_relaxed);
153   }
154
155   // We are guaranteed to be called while StaticMeta lock is still
156   // held because of ordering in AtForkList.  We can therefore safely
157   // touch orphan_ and clear out all counts.
158   void resetAfterFork() {
159     if (int_cache_) {
160       int_cache_->dec_[0].store(0, std::memory_order_relaxed);
161       int_cache_->dec_[1].store(0, std::memory_order_relaxed);
162       int_cache_->inc_[0].store(0, std::memory_order_relaxed);
163       int_cache_->inc_[1].store(0, std::memory_order_relaxed);
164     }
165     orphan_inc_[0].store(0, std::memory_order_relaxed);
166     orphan_inc_[1].store(0, std::memory_order_relaxed);
167     orphan_dec_[0].store(0, std::memory_order_relaxed);
168     orphan_dec_[1].store(0, std::memory_order_relaxed);
169   }
170 };
171
172 template <typename Tag>
173 thread_local typename detail::ThreadCachedInts<Tag>::Integer*
174     detail::ThreadCachedInts<Tag>::int_cache_{nullptr};
175
176 } // namespace detail
177 } // namespace folly