folly copyright 2015 -> copyright 2016
[folly.git] / folly / test / DeterministicSchedule.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/test/DeterministicSchedule.h>
18 #include <algorithm>
19 #include <list>
20 #include <mutex>
21 #include <random>
22 #include <utility>
23 #include <unordered_map>
24 #include <assert.h>
25
26 namespace folly {
27 namespace test {
28
29 FOLLY_TLS sem_t* DeterministicSchedule::tls_sem;
30 FOLLY_TLS DeterministicSchedule* DeterministicSchedule::tls_sched;
31 FOLLY_TLS unsigned DeterministicSchedule::tls_threadId;
32
33 // access is protected by futexLock
34 static std::unordered_map<detail::Futex<DeterministicAtomic>*,
35                           std::list<std::pair<uint32_t, bool*>>> futexQueues;
36
37 static std::mutex futexLock;
38
39 DeterministicSchedule::DeterministicSchedule(
40     const std::function<int(int)>& scheduler)
41     : scheduler_(scheduler), nextThreadId_(1) {
42   assert(tls_sem == nullptr);
43   assert(tls_sched == nullptr);
44
45   tls_sem = new sem_t;
46   sem_init(tls_sem, 0, 1);
47   sems_.push_back(tls_sem);
48
49   tls_sched = this;
50 }
51
52 DeterministicSchedule::~DeterministicSchedule() {
53   assert(tls_sched == this);
54   assert(sems_.size() == 1);
55   assert(sems_[0] == tls_sem);
56   beforeThreadExit();
57 }
58
59 std::function<int(int)> DeterministicSchedule::uniform(long seed) {
60   auto rand = std::make_shared<std::ranlux48>(seed);
61   return [rand](size_t numActive) {
62     auto dist = std::uniform_int_distribution<int>(0, numActive - 1);
63     return dist(*rand);
64   };
65 }
66
67 struct UniformSubset {
68   UniformSubset(long seed, int subsetSize, int stepsBetweenSelect)
69       : uniform_(DeterministicSchedule::uniform(seed)),
70         subsetSize_(subsetSize),
71         stepsBetweenSelect_(stepsBetweenSelect),
72         stepsLeft_(0) {}
73
74   size_t operator()(size_t numActive) {
75     adjustPermSize(numActive);
76     if (stepsLeft_-- == 0) {
77       stepsLeft_ = stepsBetweenSelect_ - 1;
78       shufflePrefix();
79     }
80     return perm_[uniform_(std::min(numActive, subsetSize_))];
81   }
82
83  private:
84   std::function<int(int)> uniform_;
85   const size_t subsetSize_;
86   const int stepsBetweenSelect_;
87
88   int stepsLeft_;
89   // only the first subsetSize_ is properly randomized
90   std::vector<int> perm_;
91
92   void adjustPermSize(size_t numActive) {
93     if (perm_.size() > numActive) {
94       perm_.erase(std::remove_if(perm_.begin(), perm_.end(), [=](size_t x) {
95         return x >= numActive;
96       }), perm_.end());
97     } else {
98       while (perm_.size() < numActive) {
99         perm_.push_back(perm_.size());
100       }
101     }
102     assert(perm_.size() == numActive);
103   }
104
105   void shufflePrefix() {
106     for (size_t i = 0; i < std::min(perm_.size() - 1, subsetSize_); ++i) {
107       int j = uniform_(perm_.size() - i) + i;
108       std::swap(perm_[i], perm_[j]);
109     }
110   }
111 };
112
113 std::function<int(int)> DeterministicSchedule::uniformSubset(long seed,
114                                                              int n,
115                                                              int m) {
116   auto gen = std::make_shared<UniformSubset>(seed, n, m);
117   return [=](size_t numActive) { return (*gen)(numActive); };
118 }
119
120 void DeterministicSchedule::beforeSharedAccess() {
121   if (tls_sem) {
122     sem_wait(tls_sem);
123   }
124 }
125
126 void DeterministicSchedule::afterSharedAccess() {
127   auto sched = tls_sched;
128   if (!sched) {
129     return;
130   }
131
132   sem_post(sched->sems_[sched->scheduler_(sched->sems_.size())]);
133 }
134
135 int DeterministicSchedule::getRandNumber(int n) {
136   if (tls_sched) {
137     return tls_sched->scheduler_(n);
138   }
139   return std::rand() % n;
140 }
141
142 int DeterministicSchedule::getcpu(unsigned* cpu,
143                                   unsigned* node,
144                                   void* /* unused */) {
145   if (!tls_threadId && tls_sched) {
146     beforeSharedAccess();
147     tls_threadId = tls_sched->nextThreadId_++;
148     afterSharedAccess();
149   }
150   if (cpu) {
151     *cpu = tls_threadId;
152   }
153   if (node) {
154     *node = tls_threadId;
155   }
156   return 0;
157 }
158
159 sem_t* DeterministicSchedule::beforeThreadCreate() {
160   sem_t* s = new sem_t;
161   sem_init(s, 0, 0);
162   beforeSharedAccess();
163   sems_.push_back(s);
164   afterSharedAccess();
165   return s;
166 }
167
168 void DeterministicSchedule::afterThreadCreate(sem_t* sem) {
169   assert(tls_sem == nullptr);
170   assert(tls_sched == nullptr);
171   tls_sem = sem;
172   tls_sched = this;
173   bool started = false;
174   while (!started) {
175     beforeSharedAccess();
176     if (active_.count(std::this_thread::get_id()) == 1) {
177       started = true;
178     }
179     afterSharedAccess();
180   }
181 }
182
183 void DeterministicSchedule::beforeThreadExit() {
184   assert(tls_sched == this);
185   beforeSharedAccess();
186   sems_.erase(std::find(sems_.begin(), sems_.end(), tls_sem));
187   active_.erase(std::this_thread::get_id());
188   if (sems_.size() > 0) {
189     FOLLY_TEST_DSCHED_VLOG("exiting");
190     afterSharedAccess();
191   }
192   sem_destroy(tls_sem);
193   delete tls_sem;
194   tls_sem = nullptr;
195   tls_sched = nullptr;
196 }
197
198 void DeterministicSchedule::join(std::thread& child) {
199   auto sched = tls_sched;
200   if (sched) {
201     bool done = false;
202     while (!done) {
203       beforeSharedAccess();
204       done = !sched->active_.count(child.get_id());
205       if (done) {
206         FOLLY_TEST_DSCHED_VLOG("joined " << std::hex << child.get_id());
207       }
208       afterSharedAccess();
209     }
210   }
211   child.join();
212 }
213
214 void DeterministicSchedule::post(sem_t* sem) {
215   beforeSharedAccess();
216   sem_post(sem);
217   FOLLY_TEST_DSCHED_VLOG("sem_post(" << sem << ")");
218   afterSharedAccess();
219 }
220
221 bool DeterministicSchedule::tryWait(sem_t* sem) {
222   beforeSharedAccess();
223   int rv = sem_trywait(sem);
224   int e = rv == 0 ? 0 : errno;
225   FOLLY_TEST_DSCHED_VLOG("sem_trywait(" << sem << ") = " << rv
226                                         << " errno=" << e);
227   afterSharedAccess();
228   if (rv == 0) {
229     return true;
230   } else {
231     assert(e == EAGAIN);
232     return false;
233   }
234 }
235
236 void DeterministicSchedule::wait(sem_t* sem) {
237   while (!tryWait(sem)) {
238     // we're not busy waiting because this is a deterministic schedule
239   }
240 }
241 }
242 }
243
244 namespace folly {
245 namespace detail {
246
247 using namespace test;
248 using namespace std::chrono;
249
250 template <>
251 FutexResult Futex<DeterministicAtomic>::futexWaitImpl(
252     uint32_t expected,
253     time_point<system_clock>* absSystemTimeout,
254     time_point<steady_clock>* absSteadyTimeout,
255     uint32_t waitMask) {
256   bool hasTimeout = absSystemTimeout != nullptr || absSteadyTimeout != nullptr;
257   bool awoken = false;
258   FutexResult result = FutexResult::AWOKEN;
259
260   DeterministicSchedule::beforeSharedAccess();
261   FOLLY_TEST_DSCHED_VLOG(this << ".futexWait(" << std::hex << expected
262                               << ", .., " << std::hex << waitMask
263                               << ") beginning..");
264   futexLock.lock();
265   if (data == expected) {
266     auto& queue = futexQueues[this];
267     queue.emplace_back(waitMask, &awoken);
268     auto ours = queue.end();
269     ours--;
270     while (!awoken) {
271       futexLock.unlock();
272       DeterministicSchedule::afterSharedAccess();
273       DeterministicSchedule::beforeSharedAccess();
274       futexLock.lock();
275
276       // Simulate spurious wake-ups, timeouts each time with
277       // a 10% probability if we haven't been woken up already
278       if (!awoken && hasTimeout &&
279           DeterministicSchedule::getRandNumber(100) < 10) {
280         assert(futexQueues.count(this) != 0 && &futexQueues[this] == &queue);
281         queue.erase(ours);
282         if (queue.empty()) {
283           futexQueues.erase(this);
284         }
285         // Simulate ETIMEDOUT 90% of the time and other failures
286         // remaining time
287         result = DeterministicSchedule::getRandNumber(100) >= 10
288                      ? FutexResult::TIMEDOUT
289                      : FutexResult::INTERRUPTED;
290         break;
291       }
292     }
293   } else {
294     result = FutexResult::VALUE_CHANGED;
295   }
296   futexLock.unlock();
297
298   char const* resultStr = "?";
299   switch (result) {
300   case FutexResult::AWOKEN:
301     resultStr = "AWOKEN";
302     break;
303   case FutexResult::TIMEDOUT:
304     resultStr = "TIMEDOUT";
305     break;
306   case FutexResult::INTERRUPTED:
307     resultStr = "INTERRUPTED";
308     break;
309   case FutexResult::VALUE_CHANGED:
310     resultStr = "VALUE_CHANGED";
311     break;
312   }
313   FOLLY_TEST_DSCHED_VLOG(this << ".futexWait(" << std::hex << expected
314                               << ", .., " << std::hex << waitMask << ") -> "
315                               << resultStr);
316   DeterministicSchedule::afterSharedAccess();
317   return result;
318 }
319
320 template <>
321 int Futex<DeterministicAtomic>::futexWake(int count, uint32_t wakeMask) {
322   int rv = 0;
323   DeterministicSchedule::beforeSharedAccess();
324   futexLock.lock();
325   if (futexQueues.count(this) > 0) {
326     auto& queue = futexQueues[this];
327     auto iter = queue.begin();
328     while (iter != queue.end() && rv < count) {
329       auto cur = iter++;
330       if ((cur->first & wakeMask) != 0) {
331         *(cur->second) = true;
332         rv++;
333         queue.erase(cur);
334       }
335     }
336     if (queue.empty()) {
337       futexQueues.erase(this);
338     }
339   }
340   futexLock.unlock();
341   FOLLY_TEST_DSCHED_VLOG(this << ".futexWake(" << count << ", " << std::hex
342                               << wakeMask << ") -> " << rv);
343   DeterministicSchedule::afterSharedAccess();
344   return rv;
345 }
346
347 template <>
348 CacheLocality const& CacheLocality::system<test::DeterministicAtomic>() {
349   static CacheLocality cache(CacheLocality::uniform(16));
350   return cache;
351 }
352
353 template <>
354 const AccessSpreader<test::DeterministicAtomic>
355     AccessSpreader<test::DeterministicAtomic>::stripeByCore(
356         CacheLocality::system<>().numCachesByLevel.front());
357
358 template <>
359 const AccessSpreader<test::DeterministicAtomic>
360     AccessSpreader<test::DeterministicAtomic>::stripeByChip(
361         CacheLocality::system<>().numCachesByLevel.back());
362
363 template <>
364 AccessSpreaderArray<test::DeterministicAtomic, 128>
365     AccessSpreaderArray<test::DeterministicAtomic, 128>::sharedInstance = {};
366
367 template <>
368 Getcpu::Func AccessSpreader<test::DeterministicAtomic>::pickGetcpuFunc(
369     size_t /* numStripes */) {
370   return &DeterministicSchedule::getcpu;
371 }
372 }
373 }