74d4dcdf4089270c249bc5a11ebdb9bdc01d8e83
[folly.git] / folly / test / DeterministicSchedule.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/test/DeterministicSchedule.h>
18 #include <algorithm>
19 #include <list>
20 #include <mutex>
21 #include <random>
22 #include <utility>
23 #include <unordered_map>
24 #include <assert.h>
25
26 namespace folly {
27 namespace test {
28
29 FOLLY_TLS sem_t* DeterministicSchedule::tls_sem;
30 FOLLY_TLS DeterministicSchedule* DeterministicSchedule::tls_sched;
31 FOLLY_TLS unsigned DeterministicSchedule::tls_threadId;
32
33 // access is protected by futexLock
34 static std::unordered_map<detail::Futex<DeterministicAtomic>*,
35                           std::list<std::pair<uint32_t, bool*>>> futexQueues;
36
37 static std::mutex futexLock;
38
39 DeterministicSchedule::DeterministicSchedule(
40     const std::function<int(int)>& scheduler)
41     : scheduler_(scheduler), nextThreadId_(1) {
42   assert(tls_sem == nullptr);
43   assert(tls_sched == nullptr);
44
45   tls_sem = new sem_t;
46   sem_init(tls_sem, 0, 1);
47   sems_.push_back(tls_sem);
48
49   tls_sched = this;
50 }
51
52 DeterministicSchedule::~DeterministicSchedule() {
53   assert(tls_sched == this);
54   assert(sems_.size() == 1);
55   assert(sems_[0] == tls_sem);
56   beforeThreadExit();
57 }
58
59 std::function<int(int)> DeterministicSchedule::uniform(long seed) {
60   auto rand = std::make_shared<std::ranlux48>(seed);
61   return [rand](size_t numActive) {
62     auto dist = std::uniform_int_distribution<int>(0, numActive - 1);
63     return dist(*rand);
64   };
65 }
66
67 struct UniformSubset {
68   UniformSubset(long seed, int subsetSize, int stepsBetweenSelect)
69       : uniform_(DeterministicSchedule::uniform(seed)),
70         subsetSize_(subsetSize),
71         stepsBetweenSelect_(stepsBetweenSelect),
72         stepsLeft_(0) {}
73
74   size_t operator()(size_t numActive) {
75     adjustPermSize(numActive);
76     if (stepsLeft_-- == 0) {
77       stepsLeft_ = stepsBetweenSelect_ - 1;
78       shufflePrefix();
79     }
80     return perm_[uniform_(std::min(numActive, subsetSize_))];
81   }
82
83  private:
84   std::function<int(int)> uniform_;
85   const size_t subsetSize_;
86   const int stepsBetweenSelect_;
87
88   int stepsLeft_;
89   // only the first subsetSize_ is properly randomized
90   std::vector<int> perm_;
91
92   void adjustPermSize(size_t numActive) {
93     if (perm_.size() > numActive) {
94       perm_.erase(std::remove_if(perm_.begin(),
95                                  perm_.end(),
96                                  [=](size_t x) { return x >= numActive; }),
97                   perm_.end());
98     } else {
99       while (perm_.size() < numActive) {
100         perm_.push_back(perm_.size());
101       }
102     }
103     assert(perm_.size() == numActive);
104   }
105
106   void shufflePrefix() {
107     for (size_t i = 0; i < std::min(perm_.size() - 1, subsetSize_); ++i) {
108       int j = uniform_(perm_.size() - i) + i;
109       std::swap(perm_[i], perm_[j]);
110     }
111   }
112 };
113
114 std::function<int(int)> DeterministicSchedule::uniformSubset(long seed,
115                                                              int n,
116                                                              int m) {
117   auto gen = std::make_shared<UniformSubset>(seed, n, m);
118   return [=](size_t numActive) { return (*gen)(numActive); };
119 }
120
121 void DeterministicSchedule::beforeSharedAccess() {
122   if (tls_sem) {
123     sem_wait(tls_sem);
124   }
125 }
126
127 void DeterministicSchedule::afterSharedAccess() {
128   auto sched = tls_sched;
129   if (!sched) {
130     return;
131   }
132
133   sem_post(sched->sems_[sched->scheduler_(sched->sems_.size())]);
134 }
135
136 int DeterministicSchedule::getRandNumber(int n) {
137   if (tls_sched) {
138     return tls_sched->scheduler_(n);
139   }
140   return std::rand() % n;
141 }
142
143 int DeterministicSchedule::getcpu(unsigned* cpu,
144                                   unsigned* node,
145                                   void* /* unused */) {
146   if (!tls_threadId && tls_sched) {
147     beforeSharedAccess();
148     tls_threadId = tls_sched->nextThreadId_++;
149     afterSharedAccess();
150   }
151   if (cpu) {
152     *cpu = tls_threadId;
153   }
154   if (node) {
155     *node = tls_threadId;
156   }
157   return 0;
158 }
159
160 sem_t* DeterministicSchedule::beforeThreadCreate() {
161   sem_t* s = new sem_t;
162   sem_init(s, 0, 0);
163   beforeSharedAccess();
164   sems_.push_back(s);
165   afterSharedAccess();
166   return s;
167 }
168
169 void DeterministicSchedule::afterThreadCreate(sem_t* sem) {
170   assert(tls_sem == nullptr);
171   assert(tls_sched == nullptr);
172   tls_sem = sem;
173   tls_sched = this;
174   bool started = false;
175   while (!started) {
176     beforeSharedAccess();
177     if (active_.count(std::this_thread::get_id()) == 1) {
178       started = true;
179     }
180     afterSharedAccess();
181   }
182 }
183
184 void DeterministicSchedule::beforeThreadExit() {
185   assert(tls_sched == this);
186   beforeSharedAccess();
187   sems_.erase(std::find(sems_.begin(), sems_.end(), tls_sem));
188   active_.erase(std::this_thread::get_id());
189   if (sems_.size() > 0) {
190     FOLLY_TEST_DSCHED_VLOG("exiting");
191     afterSharedAccess();
192   }
193   sem_destroy(tls_sem);
194   delete tls_sem;
195   tls_sem = nullptr;
196   tls_sched = nullptr;
197 }
198
199 void DeterministicSchedule::join(std::thread& child) {
200   auto sched = tls_sched;
201   if (sched) {
202     bool done = false;
203     while (!done) {
204       beforeSharedAccess();
205       done = !sched->active_.count(child.get_id());
206       if (done) {
207         FOLLY_TEST_DSCHED_VLOG("joined " << std::hex << child.get_id());
208       }
209       afterSharedAccess();
210     }
211   }
212   child.join();
213 }
214
215 void DeterministicSchedule::post(sem_t* sem) {
216   beforeSharedAccess();
217   sem_post(sem);
218   FOLLY_TEST_DSCHED_VLOG("sem_post(" << sem << ")");
219   afterSharedAccess();
220 }
221
222 bool DeterministicSchedule::tryWait(sem_t* sem) {
223   beforeSharedAccess();
224   int rv = sem_trywait(sem);
225   int e = rv == 0 ? 0 : errno;
226   FOLLY_TEST_DSCHED_VLOG("sem_trywait(" << sem << ") = " << rv
227                                         << " errno=" << e);
228   afterSharedAccess();
229   if (rv == 0) {
230     return true;
231   } else {
232     assert(e == EAGAIN);
233     return false;
234   }
235 }
236
237 void DeterministicSchedule::wait(sem_t* sem) {
238   while (!tryWait(sem)) {
239     // we're not busy waiting because this is a deterministic schedule
240   }
241 }
242 }
243 }
244
245 namespace folly {
246 namespace detail {
247
248 using namespace test;
249 using namespace std::chrono;
250
251 template <>
252 FutexResult Futex<DeterministicAtomic>::futexWaitImpl(
253     uint32_t expected,
254     time_point<system_clock>* absSystemTimeout,
255     time_point<steady_clock>* absSteadyTimeout,
256     uint32_t waitMask) {
257   bool hasTimeout = absSystemTimeout != nullptr || absSteadyTimeout != nullptr;
258   bool awoken = false;
259   FutexResult result = FutexResult::AWOKEN;
260
261   DeterministicSchedule::beforeSharedAccess();
262   FOLLY_TEST_DSCHED_VLOG(this << ".futexWait(" << std::hex << expected
263                               << ", .., " << std::hex << waitMask
264                               << ") beginning..");
265   futexLock.lock();
266   if (data == expected) {
267     auto& queue = futexQueues[this];
268     queue.emplace_back(waitMask, &awoken);
269     auto ours = queue.end();
270     ours--;
271     while (!awoken) {
272       futexLock.unlock();
273       DeterministicSchedule::afterSharedAccess();
274       DeterministicSchedule::beforeSharedAccess();
275       futexLock.lock();
276
277       // Simulate spurious wake-ups, timeouts each time with
278       // a 10% probability if we haven't been woken up already
279       if (!awoken && hasTimeout &&
280           DeterministicSchedule::getRandNumber(100) < 10) {
281         assert(futexQueues.count(this) != 0 && &futexQueues[this] == &queue);
282         queue.erase(ours);
283         if (queue.empty()) {
284           futexQueues.erase(this);
285         }
286         // Simulate ETIMEDOUT 90% of the time and other failures
287         // remaining time
288         result = DeterministicSchedule::getRandNumber(100) >= 10
289                      ? FutexResult::TIMEDOUT
290                      : FutexResult::INTERRUPTED;
291         break;
292       }
293     }
294   } else {
295     result = FutexResult::VALUE_CHANGED;
296   }
297   futexLock.unlock();
298
299   char const* resultStr = "?";
300   switch (result) {
301     case FutexResult::AWOKEN:
302       resultStr = "AWOKEN";
303       break;
304     case FutexResult::TIMEDOUT:
305       resultStr = "TIMEDOUT";
306       break;
307     case FutexResult::INTERRUPTED:
308       resultStr = "INTERRUPTED";
309       break;
310     case FutexResult::VALUE_CHANGED:
311       resultStr = "VALUE_CHANGED";
312       break;
313   }
314   FOLLY_TEST_DSCHED_VLOG(this << ".futexWait(" << std::hex << expected
315                               << ", .., " << std::hex << waitMask << ") -> "
316                               << resultStr);
317   DeterministicSchedule::afterSharedAccess();
318   return result;
319 }
320
321 template <>
322 int Futex<DeterministicAtomic>::futexWake(int count, uint32_t wakeMask) {
323   int rv = 0;
324   DeterministicSchedule::beforeSharedAccess();
325   futexLock.lock();
326   if (futexQueues.count(this) > 0) {
327     auto& queue = futexQueues[this];
328     auto iter = queue.begin();
329     while (iter != queue.end() && rv < count) {
330       auto cur = iter++;
331       if ((cur->first & wakeMask) != 0) {
332         *(cur->second) = true;
333         rv++;
334         queue.erase(cur);
335       }
336     }
337     if (queue.empty()) {
338       futexQueues.erase(this);
339     }
340   }
341   futexLock.unlock();
342   FOLLY_TEST_DSCHED_VLOG(this << ".futexWake(" << count << ", " << std::hex
343                               << wakeMask << ") -> " << rv);
344   DeterministicSchedule::afterSharedAccess();
345   return rv;
346 }
347
348 template <>
349 CacheLocality const& CacheLocality::system<test::DeterministicAtomic>() {
350   static CacheLocality cache(CacheLocality::uniform(16));
351   return cache;
352 }
353
354 template <>
355 const AccessSpreader<test::DeterministicAtomic>
356     AccessSpreader<test::DeterministicAtomic>::stripeByCore(
357         CacheLocality::system<>().numCachesByLevel.front());
358
359 template <>
360 const AccessSpreader<test::DeterministicAtomic>
361     AccessSpreader<test::DeterministicAtomic>::stripeByChip(
362         CacheLocality::system<>().numCachesByLevel.back());
363
364 template <>
365 AccessSpreaderArray<test::DeterministicAtomic, 128>
366     AccessSpreaderArray<test::DeterministicAtomic, 128>::sharedInstance = {};
367
368 template <>
369 Getcpu::Func AccessSpreader<test::DeterministicAtomic>::pickGetcpuFunc(
370     size_t /* numStripes */) {
371   return &DeterministicSchedule::getcpu;
372 }
373 }
374 }