add per node mutex for emulated futex
[folly.git] / folly / detail / Futex.cpp
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/detail/Futex.h>
18 #include <stdint.h>
19 #include <string.h>
20 #include <condition_variable>
21 #include <mutex>
22 #include <boost/intrusive/list.hpp>
23 #include <folly/Hash.h>
24 #include <folly/ScopeGuard.h>
25
26 #ifdef __linux__
27 # include <errno.h>
28 # include <linux/futex.h>
29 # include <sys/syscall.h>
30 #endif
31
32 using namespace std::chrono;
33
34 namespace folly { namespace detail {
35
36 namespace {
37
38 ////////////////////////////////////////////////////
39 // native implementation using the futex() syscall
40
41 #ifdef __linux__
42
43 int nativeFutexWake(void* addr, int count, uint32_t wakeMask) {
44   int rv = syscall(SYS_futex,
45                    addr, /* addr1 */
46                    FUTEX_WAKE_BITSET | FUTEX_PRIVATE_FLAG, /* op */
47                    count, /* val */
48                    nullptr, /* timeout */
49                    nullptr, /* addr2 */
50                    wakeMask); /* val3 */
51
52   assert(rv >= 0);
53
54   return rv;
55 }
56
57 template <class Clock>
58 struct timespec
59 timeSpecFromTimePoint(time_point<Clock> absTime)
60 {
61   auto duration = absTime.time_since_epoch();
62   auto secs = duration_cast<seconds>(duration);
63   auto nanos = duration_cast<nanoseconds>(duration - secs);
64   struct timespec result = { secs.count(), nanos.count() };
65   return result;
66 }
67
68 FutexResult nativeFutexWaitImpl(void* addr,
69                                 uint32_t expected,
70                                 time_point<system_clock>* absSystemTime,
71                                 time_point<steady_clock>* absSteadyTime,
72                                 uint32_t waitMask) {
73   assert(absSystemTime == nullptr || absSteadyTime == nullptr);
74
75   int op = FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG;
76   struct timespec ts;
77   struct timespec* timeout = nullptr;
78
79   if (absSystemTime != nullptr) {
80     op |= FUTEX_CLOCK_REALTIME;
81     ts = timeSpecFromTimePoint(*absSystemTime);
82     timeout = &ts;
83   } else if (absSteadyTime != nullptr) {
84     ts = timeSpecFromTimePoint(*absSteadyTime);
85     timeout = &ts;
86   }
87
88   // Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET requires an absolute timeout
89   // value - http://locklessinc.com/articles/futex_cheat_sheet/
90   int rv = syscall(SYS_futex,
91                    addr, /* addr1 */
92                    op, /* op */
93                    expected, /* val */
94                    timeout, /* timeout */
95                    nullptr, /* addr2 */
96                    waitMask); /* val3 */
97
98   if (rv == 0) {
99     return FutexResult::AWOKEN;
100   } else {
101     switch(errno) {
102       case ETIMEDOUT:
103         assert(timeout != nullptr);
104         return FutexResult::TIMEDOUT;
105       case EINTR:
106         return FutexResult::INTERRUPTED;
107       case EWOULDBLOCK:
108         return FutexResult::VALUE_CHANGED;
109       default:
110         assert(false);
111         // EACCESS, EFAULT, or EINVAL. All of these mean *addr point to
112         // invalid memory (or I misunderstand the API).  We can either
113         // crash, or return a value that lets the process continue for
114         // a bit. We choose the latter. VALUE_CHANGED probably turns the
115         // caller into a spin lock.
116         return FutexResult::VALUE_CHANGED;
117     }
118   }
119 }
120
121 #endif // __linux__
122
123 ///////////////////////////////////////////////////////
124 // compatibility implementation using standard C++ API
125
126 // Our emulated futex uses 4096 lists of wait nodes.  There are two levels
127 // of locking: a per-list mutex that controls access to the list and a
128 // per-node mutex, condvar, and bool that are used for the actual wakeups.
129 // The per-node mutex allows us to do precise wakeups without thundering
130 // herds.
131
132 struct EmulatedFutexWaitNode : public boost::intrusive::list_base_hook<> {
133   void* const addr_;
134   const uint32_t waitMask_;
135
136   // tricky: hold both bucket and node mutex to write, either to read
137   bool signaled_;
138   std::mutex mutex_;
139   std::condition_variable cond_;
140
141   EmulatedFutexWaitNode(void* addr, uint32_t waitMask)
142     : addr_(addr)
143     , waitMask_(waitMask)
144     , signaled_(false)
145   {
146   }
147 };
148
149 struct EmulatedFutexBucket {
150   std::mutex mutex_;
151   boost::intrusive::list<EmulatedFutexWaitNode> waiters_;
152
153   static const size_t kNumBuckets = 4096;
154   static EmulatedFutexBucket* gBuckets;
155   static std::once_flag gBucketInit;
156
157   static EmulatedFutexBucket& bucketFor(void* addr) {
158     std::call_once(gBucketInit, [](){
159       gBuckets = new EmulatedFutexBucket[kNumBuckets];
160     });
161     uint64_t mixedBits = folly::hash::twang_mix64(
162         reinterpret_cast<uintptr_t>(addr));
163     return gBuckets[mixedBits % kNumBuckets];
164   }
165 };
166
167 EmulatedFutexBucket* EmulatedFutexBucket::gBuckets;
168 std::once_flag EmulatedFutexBucket::gBucketInit;
169
170 int emulatedFutexWake(void* addr, int count, uint32_t waitMask) {
171   auto& bucket = EmulatedFutexBucket::bucketFor(addr);
172   std::unique_lock<std::mutex> bucketLock(bucket.mutex_);
173
174   int numAwoken = 0;
175   for (auto iter = bucket.waiters_.begin();
176        numAwoken < count && iter != bucket.waiters_.end(); ) {
177     auto current = iter;
178     auto& node = *iter++;
179     if (node.addr_ == addr && (node.waitMask_ & waitMask) != 0) {
180       ++numAwoken;
181
182       // we unlink, but waiter destroys the node
183       bucket.waiters_.erase(current);
184
185       std::unique_lock<std::mutex> nodeLock(node.mutex_);
186       node.signaled_ = true;
187       node.cond_.notify_one();
188     }
189   }
190   return numAwoken;
191 }
192
193 FutexResult emulatedFutexWaitImpl(
194         void* addr,
195         uint32_t expected,
196         time_point<system_clock>* absSystemTime,
197         time_point<steady_clock>* absSteadyTime,
198         uint32_t waitMask) {
199   auto& bucket = EmulatedFutexBucket::bucketFor(addr);
200   EmulatedFutexWaitNode node(addr, waitMask);
201
202   {
203     std::unique_lock<std::mutex> bucketLock(bucket.mutex_);
204
205     uint32_t actual;
206     memcpy(&actual, addr, sizeof(uint32_t));
207     if (actual != expected) {
208       return FutexResult::VALUE_CHANGED;
209     }
210
211     bucket.waiters_.push_back(node);
212   } // bucketLock scope
213
214   std::cv_status status = std::cv_status::no_timeout;
215   {
216     std::unique_lock<std::mutex> nodeLock(node.mutex_);
217     while (!node.signaled_ && status != std::cv_status::timeout) {
218       if (absSystemTime != nullptr) {
219         status = node.cond_.wait_until(nodeLock, *absSystemTime);
220       } else if (absSteadyTime != nullptr) {
221         status = node.cond_.wait_until(nodeLock, *absSteadyTime);
222       } else {
223         node.cond_.wait(nodeLock);
224       }
225     }
226   } // nodeLock scope
227
228   if (status == std::cv_status::timeout) {
229     // it's not really a timeout until we unlink the unsignaled node
230     std::unique_lock<std::mutex> bucketLock(bucket.mutex_);
231     if (!node.signaled_) {
232       bucket.waiters_.erase(bucket.waiters_.iterator_to(node));
233       return FutexResult::TIMEDOUT;
234     }
235   }
236   return FutexResult::AWOKEN;
237 }
238
239 } // anon namespace
240
241
242 /////////////////////////////////
243 // Futex<> specializations
244
245 template <>
246 int
247 Futex<std::atomic>::futexWake(int count, uint32_t wakeMask) {
248 #ifdef __linux__
249   return nativeFutexWake(this, count, wakeMask);
250 #else
251   return emulatedFutexWake(this, count, wakeMask);
252 #endif
253 }
254
255 template <>
256 int
257 Futex<EmulatedFutexAtomic>::futexWake(int count, uint32_t wakeMask) {
258   return emulatedFutexWake(this, count, wakeMask);
259 }
260
261 template <>
262 FutexResult
263 Futex<std::atomic>::futexWaitImpl(uint32_t expected,
264                                   time_point<system_clock>* absSystemTime,
265                                   time_point<steady_clock>* absSteadyTime,
266                                   uint32_t waitMask) {
267 #ifdef __linux__
268   return nativeFutexWaitImpl(
269       this, expected, absSystemTime, absSteadyTime, waitMask);
270 #else
271   return emulatedFutexWaitImpl(
272       this, expected, absSystemTime, absSteadyTime, waitMask);
273 #endif
274 }
275
276 template <>
277 FutexResult
278 Futex<EmulatedFutexAtomic>::futexWaitImpl(
279         uint32_t expected,
280         time_point<system_clock>* absSystemTime,
281         time_point<steady_clock>* absSteadyTime,
282         uint32_t waitMask) {
283   return emulatedFutexWaitImpl(
284       this, expected, absSystemTime, absSteadyTime, waitMask);
285 }
286
287 }} // namespace folly::detail