2017
[folly.git] / folly / test / DeterministicSchedule.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/test/DeterministicSchedule.h>
18
19 #include <assert.h>
20
21 #include <algorithm>
22 #include <list>
23 #include <mutex>
24 #include <random>
25 #include <unordered_map>
26 #include <utility>
27
28 #include <folly/Random.h>
29
30 namespace folly {
31 namespace test {
32
33 FOLLY_TLS sem_t* DeterministicSchedule::tls_sem;
34 FOLLY_TLS DeterministicSchedule* DeterministicSchedule::tls_sched;
35 FOLLY_TLS unsigned DeterministicSchedule::tls_threadId;
36 thread_local AuxAct DeterministicSchedule::tls_aux_act;
37 AuxChk DeterministicSchedule::aux_chk;
38
39 // access is protected by futexLock
40 static std::unordered_map<detail::Futex<DeterministicAtomic>*,
41                           std::list<std::pair<uint32_t, bool*>>> futexQueues;
42
43 static std::mutex futexLock;
44
45 DeterministicSchedule::DeterministicSchedule(
46     const std::function<int(int)>& scheduler)
47     : scheduler_(scheduler), nextThreadId_(1), step_(0) {
48   assert(tls_sem == nullptr);
49   assert(tls_sched == nullptr);
50   assert(tls_aux_act == nullptr);
51
52   tls_sem = new sem_t;
53   sem_init(tls_sem, 0, 1);
54   sems_.push_back(tls_sem);
55
56   tls_sched = this;
57 }
58
59 DeterministicSchedule::~DeterministicSchedule() {
60   assert(tls_sched == this);
61   assert(sems_.size() == 1);
62   assert(sems_[0] == tls_sem);
63   beforeThreadExit();
64 }
65
66 std::function<int(int)> DeterministicSchedule::uniform(long seed) {
67   auto rand = std::make_shared<std::ranlux48>(seed);
68   return [rand](size_t numActive) {
69     auto dist = std::uniform_int_distribution<int>(0, numActive - 1);
70     return dist(*rand);
71   };
72 }
73
74 struct UniformSubset {
75   UniformSubset(long seed, int subsetSize, int stepsBetweenSelect)
76       : uniform_(DeterministicSchedule::uniform(seed)),
77         subsetSize_(subsetSize),
78         stepsBetweenSelect_(stepsBetweenSelect),
79         stepsLeft_(0) {}
80
81   size_t operator()(size_t numActive) {
82     adjustPermSize(numActive);
83     if (stepsLeft_-- == 0) {
84       stepsLeft_ = stepsBetweenSelect_ - 1;
85       shufflePrefix();
86     }
87     return perm_[uniform_(std::min(numActive, subsetSize_))];
88   }
89
90  private:
91   std::function<int(int)> uniform_;
92   const size_t subsetSize_;
93   const int stepsBetweenSelect_;
94
95   int stepsLeft_;
96   // only the first subsetSize_ is properly randomized
97   std::vector<int> perm_;
98
99   void adjustPermSize(size_t numActive) {
100     if (perm_.size() > numActive) {
101       perm_.erase(std::remove_if(perm_.begin(),
102                                  perm_.end(),
103                                  [=](size_t x) { return x >= numActive; }),
104                   perm_.end());
105     } else {
106       while (perm_.size() < numActive) {
107         perm_.push_back(perm_.size());
108       }
109     }
110     assert(perm_.size() == numActive);
111   }
112
113   void shufflePrefix() {
114     for (size_t i = 0; i < std::min(perm_.size() - 1, subsetSize_); ++i) {
115       int j = uniform_(perm_.size() - i) + i;
116       std::swap(perm_[i], perm_[j]);
117     }
118   }
119 };
120
121 std::function<int(int)> DeterministicSchedule::uniformSubset(long seed,
122                                                              int n,
123                                                              int m) {
124   auto gen = std::make_shared<UniformSubset>(seed, n, m);
125   return [=](size_t numActive) { return (*gen)(numActive); };
126 }
127
128 void DeterministicSchedule::beforeSharedAccess() {
129   if (tls_sem) {
130     sem_wait(tls_sem);
131   }
132 }
133
134 void DeterministicSchedule::afterSharedAccess() {
135   auto sched = tls_sched;
136   if (!sched) {
137     return;
138   }
139   sem_post(sched->sems_[sched->scheduler_(sched->sems_.size())]);
140 }
141
142 void DeterministicSchedule::afterSharedAccess(bool success) {
143   auto sched = tls_sched;
144   if (!sched) {
145     return;
146   }
147   sched->callAux(success);
148   sem_post(sched->sems_[sched->scheduler_(sched->sems_.size())]);
149 }
150
151 int DeterministicSchedule::getRandNumber(int n) {
152   if (tls_sched) {
153     return tls_sched->scheduler_(n);
154   }
155   return Random::rand32() % n;
156 }
157
158 int DeterministicSchedule::getcpu(unsigned* cpu,
159                                   unsigned* node,
160                                   void* /* unused */) {
161   if (!tls_threadId && tls_sched) {
162     beforeSharedAccess();
163     tls_threadId = tls_sched->nextThreadId_++;
164     afterSharedAccess();
165   }
166   if (cpu) {
167     *cpu = tls_threadId;
168   }
169   if (node) {
170     *node = tls_threadId;
171   }
172   return 0;
173 }
174
175 void DeterministicSchedule::setAuxAct(AuxAct& aux) {
176   tls_aux_act = aux;
177 }
178
179 void DeterministicSchedule::setAuxChk(AuxChk& aux) {
180   aux_chk = aux;
181 }
182
183 void DeterministicSchedule::clearAuxChk() {
184   aux_chk = nullptr;
185 }
186
187 sem_t* DeterministicSchedule::beforeThreadCreate() {
188   sem_t* s = new sem_t;
189   sem_init(s, 0, 0);
190   beforeSharedAccess();
191   sems_.push_back(s);
192   afterSharedAccess();
193   return s;
194 }
195
196 void DeterministicSchedule::afterThreadCreate(sem_t* sem) {
197   assert(tls_sem == nullptr);
198   assert(tls_sched == nullptr);
199   tls_sem = sem;
200   tls_sched = this;
201   bool started = false;
202   while (!started) {
203     beforeSharedAccess();
204     if (active_.count(std::this_thread::get_id()) == 1) {
205       started = true;
206     }
207     afterSharedAccess();
208   }
209 }
210
211 void DeterministicSchedule::beforeThreadExit() {
212   assert(tls_sched == this);
213   beforeSharedAccess();
214   sems_.erase(std::find(sems_.begin(), sems_.end(), tls_sem));
215   active_.erase(std::this_thread::get_id());
216   if (sems_.size() > 0) {
217     FOLLY_TEST_DSCHED_VLOG("exiting");
218     afterSharedAccess();
219   }
220   sem_destroy(tls_sem);
221   delete tls_sem;
222   tls_sem = nullptr;
223   tls_sched = nullptr;
224   tls_aux_act = nullptr;
225 }
226
227 void DeterministicSchedule::join(std::thread& child) {
228   auto sched = tls_sched;
229   if (sched) {
230     bool done = false;
231     while (!done) {
232       beforeSharedAccess();
233       done = !sched->active_.count(child.get_id());
234       if (done) {
235         FOLLY_TEST_DSCHED_VLOG("joined " << std::hex << child.get_id());
236       }
237       afterSharedAccess();
238     }
239   }
240   child.join();
241 }
242
243 void DeterministicSchedule::callAux(bool success) {
244   ++step_;
245   if (tls_aux_act) {
246     tls_aux_act(success);
247     tls_aux_act = nullptr;
248   }
249   if (aux_chk) {
250     aux_chk(step_);
251   }
252 }
253
254 void DeterministicSchedule::post(sem_t* sem) {
255   beforeSharedAccess();
256   sem_post(sem);
257   FOLLY_TEST_DSCHED_VLOG("sem_post(" << sem << ")");
258   afterSharedAccess();
259 }
260
261 bool DeterministicSchedule::tryWait(sem_t* sem) {
262   beforeSharedAccess();
263   int rv = sem_trywait(sem);
264   int e = rv == 0 ? 0 : errno;
265   FOLLY_TEST_DSCHED_VLOG("sem_trywait(" << sem << ") = " << rv
266                                         << " errno=" << e);
267   afterSharedAccess();
268   if (rv == 0) {
269     return true;
270   } else {
271     assert(e == EAGAIN);
272     return false;
273   }
274 }
275
276 void DeterministicSchedule::wait(sem_t* sem) {
277   while (!tryWait(sem)) {
278     // we're not busy waiting because this is a deterministic schedule
279   }
280 }
281 }
282 }
283
284 namespace folly {
285 namespace detail {
286
287 using namespace test;
288 using namespace std::chrono;
289
290 template <>
291 FutexResult Futex<DeterministicAtomic>::futexWaitImpl(
292     uint32_t expected,
293     time_point<system_clock>* absSystemTimeout,
294     time_point<steady_clock>* absSteadyTimeout,
295     uint32_t waitMask) {
296   bool hasTimeout = absSystemTimeout != nullptr || absSteadyTimeout != nullptr;
297   bool awoken = false;
298   FutexResult result = FutexResult::AWOKEN;
299
300   DeterministicSchedule::beforeSharedAccess();
301   FOLLY_TEST_DSCHED_VLOG(this << ".futexWait(" << std::hex << expected
302                               << ", .., " << std::hex << waitMask
303                               << ") beginning..");
304   futexLock.lock();
305   if (this->data == expected) {
306     auto& queue = futexQueues[this];
307     queue.emplace_back(waitMask, &awoken);
308     auto ours = queue.end();
309     ours--;
310     while (!awoken) {
311       futexLock.unlock();
312       DeterministicSchedule::afterSharedAccess();
313       DeterministicSchedule::beforeSharedAccess();
314       futexLock.lock();
315
316       // Simulate spurious wake-ups, timeouts each time with
317       // a 10% probability if we haven't been woken up already
318       if (!awoken && hasTimeout &&
319           DeterministicSchedule::getRandNumber(100) < 10) {
320         assert(futexQueues.count(this) != 0 && &futexQueues[this] == &queue);
321         queue.erase(ours);
322         if (queue.empty()) {
323           futexQueues.erase(this);
324         }
325         // Simulate ETIMEDOUT 90% of the time and other failures
326         // remaining time
327         result = DeterministicSchedule::getRandNumber(100) >= 10
328                      ? FutexResult::TIMEDOUT
329                      : FutexResult::INTERRUPTED;
330         break;
331       }
332     }
333   } else {
334     result = FutexResult::VALUE_CHANGED;
335   }
336   futexLock.unlock();
337
338   char const* resultStr = "?";
339   switch (result) {
340     case FutexResult::AWOKEN:
341       resultStr = "AWOKEN";
342       break;
343     case FutexResult::TIMEDOUT:
344       resultStr = "TIMEDOUT";
345       break;
346     case FutexResult::INTERRUPTED:
347       resultStr = "INTERRUPTED";
348       break;
349     case FutexResult::VALUE_CHANGED:
350       resultStr = "VALUE_CHANGED";
351       break;
352   }
353   FOLLY_TEST_DSCHED_VLOG(this << ".futexWait(" << std::hex << expected
354                               << ", .., " << std::hex << waitMask << ") -> "
355                               << resultStr);
356   DeterministicSchedule::afterSharedAccess();
357   return result;
358 }
359
360 template <>
361 int Futex<DeterministicAtomic>::futexWake(int count, uint32_t wakeMask) {
362   int rv = 0;
363   DeterministicSchedule::beforeSharedAccess();
364   futexLock.lock();
365   if (futexQueues.count(this) > 0) {
366     auto& queue = futexQueues[this];
367     auto iter = queue.begin();
368     while (iter != queue.end() && rv < count) {
369       auto cur = iter++;
370       if ((cur->first & wakeMask) != 0) {
371         *(cur->second) = true;
372         rv++;
373         queue.erase(cur);
374       }
375     }
376     if (queue.empty()) {
377       futexQueues.erase(this);
378     }
379   }
380   futexLock.unlock();
381   FOLLY_TEST_DSCHED_VLOG(this << ".futexWake(" << count << ", " << std::hex
382                               << wakeMask << ") -> " << rv);
383   DeterministicSchedule::afterSharedAccess();
384   return rv;
385 }
386
387 template <>
388 CacheLocality const& CacheLocality::system<test::DeterministicAtomic>() {
389   static CacheLocality cache(CacheLocality::uniform(16));
390   return cache;
391 }
392
393 template <>
394 Getcpu::Func AccessSpreader<test::DeterministicAtomic>::pickGetcpuFunc() {
395   return &DeterministicSchedule::getcpu;
396 }
397 }
398 }