fix SIOF in CacheLocality.h's AccessSpreader
[folly.git] / folly / test / DeterministicSchedule.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/test/DeterministicSchedule.h>
18 #include <algorithm>
19 #include <list>
20 #include <mutex>
21 #include <random>
22 #include <utility>
23 #include <unordered_map>
24 #include <assert.h>
25
26 DECLARE_ACCESS_SPREADER_TYPE(folly::test::DeterministicAtomic)
27
28 namespace folly {
29 namespace test {
30
31 FOLLY_TLS sem_t* DeterministicSchedule::tls_sem;
32 FOLLY_TLS DeterministicSchedule* DeterministicSchedule::tls_sched;
33 FOLLY_TLS unsigned DeterministicSchedule::tls_threadId;
34
35 // access is protected by futexLock
36 static std::unordered_map<detail::Futex<DeterministicAtomic>*,
37                           std::list<std::pair<uint32_t, bool*>>> futexQueues;
38
39 static std::mutex futexLock;
40
41 DeterministicSchedule::DeterministicSchedule(
42     const std::function<int(int)>& scheduler)
43     : scheduler_(scheduler), nextThreadId_(1) {
44   assert(tls_sem == nullptr);
45   assert(tls_sched == nullptr);
46
47   tls_sem = new sem_t;
48   sem_init(tls_sem, 0, 1);
49   sems_.push_back(tls_sem);
50
51   tls_sched = this;
52 }
53
54 DeterministicSchedule::~DeterministicSchedule() {
55   assert(tls_sched == this);
56   assert(sems_.size() == 1);
57   assert(sems_[0] == tls_sem);
58   beforeThreadExit();
59 }
60
61 std::function<int(int)> DeterministicSchedule::uniform(long seed) {
62   auto rand = std::make_shared<std::ranlux48>(seed);
63   return [rand](size_t numActive) {
64     auto dist = std::uniform_int_distribution<int>(0, numActive - 1);
65     return dist(*rand);
66   };
67 }
68
69 struct UniformSubset {
70   UniformSubset(long seed, int subsetSize, int stepsBetweenSelect)
71       : uniform_(DeterministicSchedule::uniform(seed)),
72         subsetSize_(subsetSize),
73         stepsBetweenSelect_(stepsBetweenSelect),
74         stepsLeft_(0) {}
75
76   size_t operator()(size_t numActive) {
77     adjustPermSize(numActive);
78     if (stepsLeft_-- == 0) {
79       stepsLeft_ = stepsBetweenSelect_ - 1;
80       shufflePrefix();
81     }
82     return perm_[uniform_(std::min(numActive, subsetSize_))];
83   }
84
85  private:
86   std::function<int(int)> uniform_;
87   const size_t subsetSize_;
88   const int stepsBetweenSelect_;
89
90   int stepsLeft_;
91   // only the first subsetSize_ is properly randomized
92   std::vector<int> perm_;
93
94   void adjustPermSize(size_t numActive) {
95     if (perm_.size() > numActive) {
96       perm_.erase(std::remove_if(perm_.begin(),
97                                  perm_.end(),
98                                  [=](size_t x) { return x >= numActive; }),
99                   perm_.end());
100     } else {
101       while (perm_.size() < numActive) {
102         perm_.push_back(perm_.size());
103       }
104     }
105     assert(perm_.size() == numActive);
106   }
107
108   void shufflePrefix() {
109     for (size_t i = 0; i < std::min(perm_.size() - 1, subsetSize_); ++i) {
110       int j = uniform_(perm_.size() - i) + i;
111       std::swap(perm_[i], perm_[j]);
112     }
113   }
114 };
115
116 std::function<int(int)> DeterministicSchedule::uniformSubset(long seed,
117                                                              int n,
118                                                              int m) {
119   auto gen = std::make_shared<UniformSubset>(seed, n, m);
120   return [=](size_t numActive) { return (*gen)(numActive); };
121 }
122
123 void DeterministicSchedule::beforeSharedAccess() {
124   if (tls_sem) {
125     sem_wait(tls_sem);
126   }
127 }
128
129 void DeterministicSchedule::afterSharedAccess() {
130   auto sched = tls_sched;
131   if (!sched) {
132     return;
133   }
134
135   sem_post(sched->sems_[sched->scheduler_(sched->sems_.size())]);
136 }
137
138 int DeterministicSchedule::getRandNumber(int n) {
139   if (tls_sched) {
140     return tls_sched->scheduler_(n);
141   }
142   return std::rand() % n;
143 }
144
145 int DeterministicSchedule::getcpu(unsigned* cpu,
146                                   unsigned* node,
147                                   void* /* unused */) {
148   if (!tls_threadId && tls_sched) {
149     beforeSharedAccess();
150     tls_threadId = tls_sched->nextThreadId_++;
151     afterSharedAccess();
152   }
153   if (cpu) {
154     *cpu = tls_threadId;
155   }
156   if (node) {
157     *node = tls_threadId;
158   }
159   return 0;
160 }
161
162 sem_t* DeterministicSchedule::beforeThreadCreate() {
163   sem_t* s = new sem_t;
164   sem_init(s, 0, 0);
165   beforeSharedAccess();
166   sems_.push_back(s);
167   afterSharedAccess();
168   return s;
169 }
170
171 void DeterministicSchedule::afterThreadCreate(sem_t* sem) {
172   assert(tls_sem == nullptr);
173   assert(tls_sched == nullptr);
174   tls_sem = sem;
175   tls_sched = this;
176   bool started = false;
177   while (!started) {
178     beforeSharedAccess();
179     if (active_.count(std::this_thread::get_id()) == 1) {
180       started = true;
181     }
182     afterSharedAccess();
183   }
184 }
185
186 void DeterministicSchedule::beforeThreadExit() {
187   assert(tls_sched == this);
188   beforeSharedAccess();
189   sems_.erase(std::find(sems_.begin(), sems_.end(), tls_sem));
190   active_.erase(std::this_thread::get_id());
191   if (sems_.size() > 0) {
192     FOLLY_TEST_DSCHED_VLOG("exiting");
193     afterSharedAccess();
194   }
195   sem_destroy(tls_sem);
196   delete tls_sem;
197   tls_sem = nullptr;
198   tls_sched = nullptr;
199 }
200
201 void DeterministicSchedule::join(std::thread& child) {
202   auto sched = tls_sched;
203   if (sched) {
204     bool done = false;
205     while (!done) {
206       beforeSharedAccess();
207       done = !sched->active_.count(child.get_id());
208       if (done) {
209         FOLLY_TEST_DSCHED_VLOG("joined " << std::hex << child.get_id());
210       }
211       afterSharedAccess();
212     }
213   }
214   child.join();
215 }
216
217 void DeterministicSchedule::post(sem_t* sem) {
218   beforeSharedAccess();
219   sem_post(sem);
220   FOLLY_TEST_DSCHED_VLOG("sem_post(" << sem << ")");
221   afterSharedAccess();
222 }
223
224 bool DeterministicSchedule::tryWait(sem_t* sem) {
225   beforeSharedAccess();
226   int rv = sem_trywait(sem);
227   int e = rv == 0 ? 0 : errno;
228   FOLLY_TEST_DSCHED_VLOG("sem_trywait(" << sem << ") = " << rv
229                                         << " errno=" << e);
230   afterSharedAccess();
231   if (rv == 0) {
232     return true;
233   } else {
234     assert(e == EAGAIN);
235     return false;
236   }
237 }
238
239 void DeterministicSchedule::wait(sem_t* sem) {
240   while (!tryWait(sem)) {
241     // we're not busy waiting because this is a deterministic schedule
242   }
243 }
244 }
245 }
246
247 namespace folly {
248 namespace detail {
249
250 using namespace test;
251 using namespace std::chrono;
252
253 template <>
254 FutexResult Futex<DeterministicAtomic>::futexWaitImpl(
255     uint32_t expected,
256     time_point<system_clock>* absSystemTimeout,
257     time_point<steady_clock>* absSteadyTimeout,
258     uint32_t waitMask) {
259   bool hasTimeout = absSystemTimeout != nullptr || absSteadyTimeout != nullptr;
260   bool awoken = false;
261   FutexResult result = FutexResult::AWOKEN;
262
263   DeterministicSchedule::beforeSharedAccess();
264   FOLLY_TEST_DSCHED_VLOG(this << ".futexWait(" << std::hex << expected
265                               << ", .., " << std::hex << waitMask
266                               << ") beginning..");
267   futexLock.lock();
268   if (data == expected) {
269     auto& queue = futexQueues[this];
270     queue.emplace_back(waitMask, &awoken);
271     auto ours = queue.end();
272     ours--;
273     while (!awoken) {
274       futexLock.unlock();
275       DeterministicSchedule::afterSharedAccess();
276       DeterministicSchedule::beforeSharedAccess();
277       futexLock.lock();
278
279       // Simulate spurious wake-ups, timeouts each time with
280       // a 10% probability if we haven't been woken up already
281       if (!awoken && hasTimeout &&
282           DeterministicSchedule::getRandNumber(100) < 10) {
283         assert(futexQueues.count(this) != 0 && &futexQueues[this] == &queue);
284         queue.erase(ours);
285         if (queue.empty()) {
286           futexQueues.erase(this);
287         }
288         // Simulate ETIMEDOUT 90% of the time and other failures
289         // remaining time
290         result = DeterministicSchedule::getRandNumber(100) >= 10
291                      ? FutexResult::TIMEDOUT
292                      : FutexResult::INTERRUPTED;
293         break;
294       }
295     }
296   } else {
297     result = FutexResult::VALUE_CHANGED;
298   }
299   futexLock.unlock();
300
301   char const* resultStr = "?";
302   switch (result) {
303     case FutexResult::AWOKEN:
304       resultStr = "AWOKEN";
305       break;
306     case FutexResult::TIMEDOUT:
307       resultStr = "TIMEDOUT";
308       break;
309     case FutexResult::INTERRUPTED:
310       resultStr = "INTERRUPTED";
311       break;
312     case FutexResult::VALUE_CHANGED:
313       resultStr = "VALUE_CHANGED";
314       break;
315   }
316   FOLLY_TEST_DSCHED_VLOG(this << ".futexWait(" << std::hex << expected
317                               << ", .., " << std::hex << waitMask << ") -> "
318                               << resultStr);
319   DeterministicSchedule::afterSharedAccess();
320   return result;
321 }
322
323 template <>
324 int Futex<DeterministicAtomic>::futexWake(int count, uint32_t wakeMask) {
325   int rv = 0;
326   DeterministicSchedule::beforeSharedAccess();
327   futexLock.lock();
328   if (futexQueues.count(this) > 0) {
329     auto& queue = futexQueues[this];
330     auto iter = queue.begin();
331     while (iter != queue.end() && rv < count) {
332       auto cur = iter++;
333       if ((cur->first & wakeMask) != 0) {
334         *(cur->second) = true;
335         rv++;
336         queue.erase(cur);
337       }
338     }
339     if (queue.empty()) {
340       futexQueues.erase(this);
341     }
342   }
343   futexLock.unlock();
344   FOLLY_TEST_DSCHED_VLOG(this << ".futexWake(" << count << ", " << std::hex
345                               << wakeMask << ") -> " << rv);
346   DeterministicSchedule::afterSharedAccess();
347   return rv;
348 }
349
350 template <>
351 CacheLocality const& CacheLocality::system<test::DeterministicAtomic>() {
352   static CacheLocality cache(CacheLocality::uniform(16));
353   return cache;
354 }
355
356 template <>
357 Getcpu::Func AccessSpreader<test::DeterministicAtomic>::pickGetcpuFunc() {
358   return &DeterministicSchedule::getcpu;
359 }
360 }
361 }