folly: futexWaitUntilImpl: avoid abort triggered via tao/queues tests
[folly.git] / folly / test / DeterministicSchedule.cpp
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "DeterministicSchedule.h"
18 #include <algorithm>
19 #include <list>
20 #include <mutex>
21 #include <random>
22 #include <utility>
23 #include <unordered_map>
24 #include <assert.h>
25
26 namespace folly { namespace test {
27
28 FOLLY_TLS sem_t* DeterministicSchedule::tls_sem;
29 FOLLY_TLS DeterministicSchedule* DeterministicSchedule::tls_sched;
30
31 // access is protected by futexLock
32 static std::unordered_map<detail::Futex<DeterministicAtomic>*,
33                           std::list<std::pair<uint32_t,bool*>>> futexQueues;
34
35 static std::mutex futexLock;
36
37 DeterministicSchedule::DeterministicSchedule(
38         const std::function<int(int)>& scheduler)
39   : scheduler_(scheduler)
40 {
41   assert(tls_sem == nullptr);
42   assert(tls_sched == nullptr);
43
44   tls_sem = new sem_t;
45   sem_init(tls_sem, 0, 1);
46   sems_.push_back(tls_sem);
47
48   tls_sched = this;
49 }
50
51 DeterministicSchedule::~DeterministicSchedule() {
52   assert(tls_sched == this);
53   assert(sems_.size() == 1);
54   assert(sems_[0] == tls_sem);
55   beforeThreadExit();
56 }
57
58 std::function<int(int)>
59 DeterministicSchedule::uniform(long seed) {
60   auto rand = std::make_shared<std::ranlux48>(seed);
61   return [rand](int numActive) {
62     auto dist = std::uniform_int_distribution<int>(0, numActive - 1);
63     return dist(*rand);
64   };
65 }
66
67 struct UniformSubset {
68   UniformSubset(long seed, int subsetSize, int stepsBetweenSelect)
69     : uniform_(DeterministicSchedule::uniform(seed))
70     , subsetSize_(subsetSize)
71     , stepsBetweenSelect_(stepsBetweenSelect)
72     , stepsLeft_(0)
73   {
74   }
75
76   int operator()(int numActive) {
77     adjustPermSize(numActive);
78     if (stepsLeft_-- == 0) {
79       stepsLeft_ = stepsBetweenSelect_ - 1;
80       shufflePrefix();
81     }
82     return perm_[uniform_(std::min(numActive, subsetSize_))];
83   }
84
85  private:
86   std::function<int(int)> uniform_;
87   const int subsetSize_;
88   const int stepsBetweenSelect_;
89
90   int stepsLeft_;
91   // only the first subsetSize_ is properly randomized
92   std::vector<int> perm_;
93
94   void adjustPermSize(int numActive) {
95     if (perm_.size() > numActive) {
96       perm_.erase(std::remove_if(perm_.begin(), perm_.end(),
97               [=](int x){ return x >= numActive; }), perm_.end());
98     } else {
99       while (perm_.size() < numActive) {
100         perm_.push_back(perm_.size());
101       }
102     }
103     assert(perm_.size() == numActive);
104   }
105
106   void shufflePrefix() {
107     for (int i = 0; i < std::min(int(perm_.size() - 1), subsetSize_); ++i) {
108       int j = uniform_(perm_.size() - i) + i;
109       std::swap(perm_[i], perm_[j]);
110     }
111   }
112 };
113
114 std::function<int(int)>
115 DeterministicSchedule::uniformSubset(long seed, int n, int m) {
116   auto gen = std::make_shared<UniformSubset>(seed, n, m);
117   return [=](int numActive) { return (*gen)(numActive); };
118 }
119
120 void
121 DeterministicSchedule::beforeSharedAccess() {
122   if (tls_sem) {
123     sem_wait(tls_sem);
124   }
125 }
126
127 void
128 DeterministicSchedule::afterSharedAccess() {
129   auto sched = tls_sched;
130   if (!sched) {
131     return;
132   }
133
134   sem_post(sched->sems_[sched->scheduler_(sched->sems_.size())]);
135 }
136
137 int
138 DeterministicSchedule::getRandNumber(int n) {
139   if (tls_sched) {
140     return tls_sched->scheduler_(n);
141   }
142   return std::rand() % n;
143 }
144
145 sem_t*
146 DeterministicSchedule::beforeThreadCreate() {
147   sem_t* s = new sem_t;
148   sem_init(s, 0, 0);
149   beforeSharedAccess();
150   sems_.push_back(s);
151   afterSharedAccess();
152   return s;
153 }
154
155 void
156 DeterministicSchedule::afterThreadCreate(sem_t* sem) {
157   assert(tls_sem == nullptr);
158   assert(tls_sched == nullptr);
159   tls_sem = sem;
160   tls_sched = this;
161   bool started = false;
162   while (!started) {
163     beforeSharedAccess();
164     if (active_.count(std::this_thread::get_id()) == 1) {
165       started = true;
166     }
167     afterSharedAccess();
168   }
169 }
170
171 void
172 DeterministicSchedule::beforeThreadExit() {
173   assert(tls_sched == this);
174   beforeSharedAccess();
175   sems_.erase(std::find(sems_.begin(), sems_.end(), tls_sem));
176   active_.erase(std::this_thread::get_id());
177   if (sems_.size() > 0) {
178     afterSharedAccess();
179   }
180   sem_destroy(tls_sem);
181   delete tls_sem;
182   tls_sem = nullptr;
183   tls_sched = nullptr;
184 }
185
186 void
187 DeterministicSchedule::join(std::thread& child) {
188   auto sched = tls_sched;
189   if (sched) {
190     bool done = false;
191     while (!done) {
192       beforeSharedAccess();
193       done = !sched->active_.count(child.get_id());
194       afterSharedAccess();
195     }
196   }
197   child.join();
198 }
199
200 void
201 DeterministicSchedule::post(sem_t* sem) {
202   beforeSharedAccess();
203   sem_post(sem);
204   afterSharedAccess();
205 }
206
207 bool
208 DeterministicSchedule::tryWait(sem_t* sem) {
209   beforeSharedAccess();
210   int rv = sem_trywait(sem);
211   afterSharedAccess();
212   if (rv == 0) {
213     return true;
214   } else {
215     assert(errno == EAGAIN);
216     return false;
217   }
218 }
219
220 void
221 DeterministicSchedule::wait(sem_t* sem) {
222   while (!tryWait(sem)) {
223     // we're not busy waiting because this is a deterministic schedule
224   }
225 }
226
227 }}
228
229 namespace folly { namespace detail {
230
231 using namespace test;
232
233 template<>
234 bool Futex<DeterministicAtomic>::futexWait(uint32_t expected,
235                                            uint32_t waitMask) {
236   bool rv;
237   DeterministicSchedule::beforeSharedAccess();
238   futexLock.lock();
239   if (data != expected) {
240     rv = false;
241   } else {
242     auto& queue = futexQueues[this];
243     bool done = false;
244     queue.push_back(std::make_pair(waitMask, &done));
245     while (!done) {
246       futexLock.unlock();
247       DeterministicSchedule::afterSharedAccess();
248       DeterministicSchedule::beforeSharedAccess();
249       futexLock.lock();
250     }
251     rv = true;
252   }
253   futexLock.unlock();
254   DeterministicSchedule::afterSharedAccess();
255   return rv;
256 }
257
258 FutexResult futexWaitUntilImpl(Futex<DeterministicAtomic>* futex,
259                                uint32_t expected, uint32_t waitMask) {
260   if (futex == nullptr) {
261     return FutexResult::VALUE_CHANGED;
262   }
263
264   bool rv = false;
265   int futexErrno = 0;
266
267   DeterministicSchedule::beforeSharedAccess();
268   futexLock.lock();
269   if (futex->data == expected) {
270     auto& queue = futexQueues[futex];
271     queue.push_back(std::make_pair(waitMask, &rv));
272     auto ours = queue.end();
273     ours--;
274     while (!rv) {
275       futexLock.unlock();
276       DeterministicSchedule::afterSharedAccess();
277       DeterministicSchedule::beforeSharedAccess();
278       futexLock.lock();
279
280       // Simulate spurious wake-ups, timeouts each time with
281       // a 10% probability if we haven't been woken up already
282       if (!rv && DeterministicSchedule::getRandNumber(100) < 10) {
283         assert(futexQueues.count(futex) != 0 &&
284                &futexQueues[futex] == &queue);
285         queue.erase(ours);
286         if (queue.empty()) {
287           futexQueues.erase(futex);
288         }
289         rv = false;
290         // Simulate ETIMEDOUT 90% of the time and other failures
291         // remaining time
292         futexErrno =
293           DeterministicSchedule::getRandNumber(100) >= 10 ? ETIMEDOUT : EINTR;
294         break;
295       }
296     }
297   } else {
298     futexErrno = EWOULDBLOCK;
299   }
300   futexLock.unlock();
301   DeterministicSchedule::afterSharedAccess();
302   return futexErrnoToFutexResult(rv ? 0 : -1, futexErrno);
303 }
304
305 template<>
306 int Futex<DeterministicAtomic>::futexWake(int count, uint32_t wakeMask) {
307   int rv = 0;
308   DeterministicSchedule::beforeSharedAccess();
309   futexLock.lock();
310   if (futexQueues.count(this) > 0) {
311     auto& queue = futexQueues[this];
312     auto iter = queue.begin();
313     while (iter != queue.end() && rv < count) {
314       auto cur = iter++;
315       if ((cur->first & wakeMask) != 0) {
316         *(cur->second) = true;
317         rv++;
318         queue.erase(cur);
319       }
320     }
321     if (queue.empty()) {
322       futexQueues.erase(this);
323     }
324   }
325   futexLock.unlock();
326   DeterministicSchedule::afterSharedAccess();
327   return rv;
328 }
329
330
331 template<>
332 CacheLocality const& CacheLocality::system<test::DeterministicAtomic>() {
333   static CacheLocality cache(CacheLocality::uniform(16));
334   return cache;
335 }
336
337 template<>
338 test::DeterministicAtomic<size_t>
339     SequentialThreadId<test::DeterministicAtomic>::prevId(0);
340
341 template<>
342 FOLLY_TLS size_t
343     SequentialThreadId<test::DeterministicAtomic>::currentId(0);
344
345 template<>
346 const AccessSpreader<test::DeterministicAtomic>
347 AccessSpreader<test::DeterministicAtomic>::stripeByCore(
348     CacheLocality::system<>().numCachesByLevel.front());
349
350 template<>
351 const AccessSpreader<test::DeterministicAtomic>
352 AccessSpreader<test::DeterministicAtomic>::stripeByChip(
353     CacheLocality::system<>().numCachesByLevel.back());
354
355 template<>
356 AccessSpreaderArray<test::DeterministicAtomic,128>
357 AccessSpreaderArray<test::DeterministicAtomic,128>::sharedInstance = {};
358
359
360 template<>
361 Getcpu::Func
362 AccessSpreader<test::DeterministicAtomic>::pickGetcpuFunc(size_t numStripes) {
363   return &SequentialThreadId<test::DeterministicAtomic>::getcpu;
364 }
365
366 }}