use in futex
[folly.git] / folly / synchronization / Rcu-inl.h
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/Function.h>
18 #include <folly/detail/AtFork.h>
19 #include <folly/detail/TurnSequencer.h>
20
21 namespace folly {
22
23 template <typename Tag>
24 bool rcu_domain<Tag>::singleton_{false};
25
26 template <typename Tag>
27 rcu_domain<Tag>::rcu_domain(Executor* executor) noexcept
28     : executor_(executor ? executor : &QueuedImmediateExecutor::instance()) {
29   // Please use a unique tag for each domain.
30   CHECK(!singleton_);
31   singleton_ = true;
32
33   // Register fork handlers.  Holding read locks across fork is not
34   // supported.  Using read locks in other atfork handlers is not
35   // supported.  Other atfork handlers launching new child threads
36   // that use read locks *is* supported.
37   detail::AtFork::registerHandler(
38       this,
39       [this]() { syncMutex_.lock(); },
40       [this]() { syncMutex_.unlock(); },
41       [this]() {
42         counters_.resetAfterFork();
43         syncMutex_.unlock();
44       });
45 }
46
47 template <typename Tag>
48 rcu_domain<Tag>::~rcu_domain() {
49   detail::AtFork::unregisterHandler(this);
50 }
51
52 template <typename Tag>
53 rcu_token rcu_domain<Tag>::lock_shared() {
54   auto idx = version_.load(std::memory_order_acquire);
55   idx &= 1;
56   counters_.increment(idx);
57
58   return idx;
59 }
60
61 template <typename Tag>
62 void rcu_domain<Tag>::unlock_shared(rcu_token&& token) {
63   DCHECK(0 == token.epoch_ || 1 == token.epoch_);
64   counters_.decrement(token.epoch_);
65 }
66
67 template <typename Tag>
68 template <typename T>
69 void rcu_domain<Tag>::call(T&& cbin) {
70   auto node = new list_node;
71   node->cb_ = [node, cb = std::forward<T>(cbin)]() {
72     cb();
73     delete node;
74   };
75   retire(node);
76 }
77
78 template <typename Tag>
79 void rcu_domain<Tag>::retire(list_node* node) noexcept {
80   q_.push(node);
81
82   // Note that it's likely we hold a read lock here,
83   // so we can only half_sync(false).  half_sync(true)
84   // or a synchronize() call might block forever.
85   uint64_t time = std::chrono::duration_cast<std::chrono::milliseconds>(
86                       std::chrono::steady_clock::now().time_since_epoch())
87                       .count();
88   if (time > syncTime_.load(std::memory_order_relaxed) + syncTimePeriod_) {
89     list_head finished;
90     {
91       std::lock_guard<std::mutex> g(syncMutex_);
92       syncTime_.store(time, std::memory_order_relaxed);
93       half_sync(false, finished);
94     }
95     // callbacks are called outside of syncMutex_
96     finished.forEach(
97         [&](list_node* item) { executor_->add(std::move(item->cb_)); });
98   }
99 }
100
101 template <typename Tag>
102 void rcu_domain<Tag>::synchronize() noexcept {
103   auto curr = version_.load(std::memory_order_acquire);
104   // Target is two epochs away.
105   auto target = curr + 2;
106   while (true) {
107     // Try to assign ourselves to do the sync work.
108     // If someone else is already assigned, we can wait for
109     // the work to be finished by waiting on turn_.
110     auto work = work_.load(std::memory_order_acquire);
111     auto tmp = work;
112     if (work < target && work_.compare_exchange_strong(tmp, target)) {
113       list_head finished;
114       {
115         std::lock_guard<std::mutex> g(syncMutex_);
116         while (version_.load(std::memory_order_acquire) < target) {
117           half_sync(true, finished);
118         }
119       }
120       // callbacks are called outside of syncMutex_
121       finished.forEach(
122           [&](list_node* node) { executor_->add(std::move(node->cb_)); });
123       return;
124     } else {
125       if (version_.load(std::memory_order_acquire) >= target) {
126         return;
127       }
128       std::atomic<uint32_t> cutoff{100};
129       // Wait for someone to finish the work.
130       turn_.tryWaitForTurn(work, cutoff, false);
131     }
132   }
133 }
134
135 /*
136  * Not multithread safe, but it could be with proper version
137  * checking and stronger increment of version.  See
138  * https://github.com/pramalhe/ConcurrencyFreaks/blob/master/papers/gracesharingurcu-2016.pdf
139  *
140  * This version, however, can go to sleep if there are outstanding
141  * readers, and does not spin or need rescheduling, unless blocking = false.
142  */
143 template <typename Tag>
144 void rcu_domain<Tag>::half_sync(bool blocking, list_head& finished) {
145   uint64_t curr = version_.load(std::memory_order_acquire);
146   auto next = curr + 1;
147
148   // Push all work to a queue for moving through two epochs.  One
149   // version is not enough because of late readers of the version_
150   // counter in lock_shared.
151   //
152   // Note that for a similar reason we can't swap out the q here,
153   // and instead drain it, so concurrent calls to call() are safe,
154   // and will wait for the next epoch.
155   q_.collect(queues_[0]);
156
157   if (blocking) {
158     counters_.waitForZero(next & 1);
159   } else {
160     if (counters_.readFull(next & 1) != 0) {
161       return;
162     }
163   }
164
165   // Run callbacks that have been through two epochs, and swap queues
166   // for those only through a single epoch.
167   finished.splice(queues_[1]);
168   queues_[1].splice(queues_[0]);
169
170   version_.store(next, std::memory_order_release);
171   // Notify synchronous waiters in synchronize().
172   turn_.completeTurn(curr);
173 }
174
175 } // namespace folly