fix flaky ConnectTFOTimeout and ConnectTFOFallbackTimeout tests
[folly.git] / folly / test / DeterministicSchedule.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/test/DeterministicSchedule.h>
18
19 #include <assert.h>
20
21 #include <algorithm>
22 #include <list>
23 #include <mutex>
24 #include <random>
25 #include <unordered_map>
26 #include <utility>
27
28 #include <folly/Random.h>
29
30 namespace folly {
31 namespace test {
32
33 FOLLY_TLS sem_t* DeterministicSchedule::tls_sem;
34 FOLLY_TLS DeterministicSchedule* DeterministicSchedule::tls_sched;
35 FOLLY_TLS unsigned DeterministicSchedule::tls_threadId;
36
37 // access is protected by futexLock
38 static std::unordered_map<detail::Futex<DeterministicAtomic>*,
39                           std::list<std::pair<uint32_t, bool*>>> futexQueues;
40
41 static std::mutex futexLock;
42
43 DeterministicSchedule::DeterministicSchedule(
44     const std::function<int(int)>& scheduler)
45     : scheduler_(scheduler), nextThreadId_(1) {
46   assert(tls_sem == nullptr);
47   assert(tls_sched == nullptr);
48
49   tls_sem = new sem_t;
50   sem_init(tls_sem, 0, 1);
51   sems_.push_back(tls_sem);
52
53   tls_sched = this;
54 }
55
56 DeterministicSchedule::~DeterministicSchedule() {
57   assert(tls_sched == this);
58   assert(sems_.size() == 1);
59   assert(sems_[0] == tls_sem);
60   beforeThreadExit();
61 }
62
63 std::function<int(int)> DeterministicSchedule::uniform(long seed) {
64   auto rand = std::make_shared<std::ranlux48>(seed);
65   return [rand](size_t numActive) {
66     auto dist = std::uniform_int_distribution<int>(0, numActive - 1);
67     return dist(*rand);
68   };
69 }
70
71 struct UniformSubset {
72   UniformSubset(long seed, int subsetSize, int stepsBetweenSelect)
73       : uniform_(DeterministicSchedule::uniform(seed)),
74         subsetSize_(subsetSize),
75         stepsBetweenSelect_(stepsBetweenSelect),
76         stepsLeft_(0) {}
77
78   size_t operator()(size_t numActive) {
79     adjustPermSize(numActive);
80     if (stepsLeft_-- == 0) {
81       stepsLeft_ = stepsBetweenSelect_ - 1;
82       shufflePrefix();
83     }
84     return perm_[uniform_(std::min(numActive, subsetSize_))];
85   }
86
87  private:
88   std::function<int(int)> uniform_;
89   const size_t subsetSize_;
90   const int stepsBetweenSelect_;
91
92   int stepsLeft_;
93   // only the first subsetSize_ is properly randomized
94   std::vector<int> perm_;
95
96   void adjustPermSize(size_t numActive) {
97     if (perm_.size() > numActive) {
98       perm_.erase(std::remove_if(perm_.begin(),
99                                  perm_.end(),
100                                  [=](size_t x) { return x >= numActive; }),
101                   perm_.end());
102     } else {
103       while (perm_.size() < numActive) {
104         perm_.push_back(perm_.size());
105       }
106     }
107     assert(perm_.size() == numActive);
108   }
109
110   void shufflePrefix() {
111     for (size_t i = 0; i < std::min(perm_.size() - 1, subsetSize_); ++i) {
112       int j = uniform_(perm_.size() - i) + i;
113       std::swap(perm_[i], perm_[j]);
114     }
115   }
116 };
117
118 std::function<int(int)> DeterministicSchedule::uniformSubset(long seed,
119                                                              int n,
120                                                              int m) {
121   auto gen = std::make_shared<UniformSubset>(seed, n, m);
122   return [=](size_t numActive) { return (*gen)(numActive); };
123 }
124
125 void DeterministicSchedule::beforeSharedAccess() {
126   if (tls_sem) {
127     sem_wait(tls_sem);
128   }
129 }
130
131 void DeterministicSchedule::afterSharedAccess() {
132   auto sched = tls_sched;
133   if (!sched) {
134     return;
135   }
136
137   sem_post(sched->sems_[sched->scheduler_(sched->sems_.size())]);
138 }
139
140 int DeterministicSchedule::getRandNumber(int n) {
141   if (tls_sched) {
142     return tls_sched->scheduler_(n);
143   }
144   return Random::rand32() % n;
145 }
146
147 int DeterministicSchedule::getcpu(unsigned* cpu,
148                                   unsigned* node,
149                                   void* /* unused */) {
150   if (!tls_threadId && tls_sched) {
151     beforeSharedAccess();
152     tls_threadId = tls_sched->nextThreadId_++;
153     afterSharedAccess();
154   }
155   if (cpu) {
156     *cpu = tls_threadId;
157   }
158   if (node) {
159     *node = tls_threadId;
160   }
161   return 0;
162 }
163
164 sem_t* DeterministicSchedule::beforeThreadCreate() {
165   sem_t* s = new sem_t;
166   sem_init(s, 0, 0);
167   beforeSharedAccess();
168   sems_.push_back(s);
169   afterSharedAccess();
170   return s;
171 }
172
173 void DeterministicSchedule::afterThreadCreate(sem_t* sem) {
174   assert(tls_sem == nullptr);
175   assert(tls_sched == nullptr);
176   tls_sem = sem;
177   tls_sched = this;
178   bool started = false;
179   while (!started) {
180     beforeSharedAccess();
181     if (active_.count(std::this_thread::get_id()) == 1) {
182       started = true;
183     }
184     afterSharedAccess();
185   }
186 }
187
188 void DeterministicSchedule::beforeThreadExit() {
189   assert(tls_sched == this);
190   beforeSharedAccess();
191   sems_.erase(std::find(sems_.begin(), sems_.end(), tls_sem));
192   active_.erase(std::this_thread::get_id());
193   if (sems_.size() > 0) {
194     FOLLY_TEST_DSCHED_VLOG("exiting");
195     afterSharedAccess();
196   }
197   sem_destroy(tls_sem);
198   delete tls_sem;
199   tls_sem = nullptr;
200   tls_sched = nullptr;
201 }
202
203 void DeterministicSchedule::join(std::thread& child) {
204   auto sched = tls_sched;
205   if (sched) {
206     bool done = false;
207     while (!done) {
208       beforeSharedAccess();
209       done = !sched->active_.count(child.get_id());
210       if (done) {
211         FOLLY_TEST_DSCHED_VLOG("joined " << std::hex << child.get_id());
212       }
213       afterSharedAccess();
214     }
215   }
216   child.join();
217 }
218
219 void DeterministicSchedule::post(sem_t* sem) {
220   beforeSharedAccess();
221   sem_post(sem);
222   FOLLY_TEST_DSCHED_VLOG("sem_post(" << sem << ")");
223   afterSharedAccess();
224 }
225
226 bool DeterministicSchedule::tryWait(sem_t* sem) {
227   beforeSharedAccess();
228   int rv = sem_trywait(sem);
229   int e = rv == 0 ? 0 : errno;
230   FOLLY_TEST_DSCHED_VLOG("sem_trywait(" << sem << ") = " << rv
231                                         << " errno=" << e);
232   afterSharedAccess();
233   if (rv == 0) {
234     return true;
235   } else {
236     assert(e == EAGAIN);
237     return false;
238   }
239 }
240
241 void DeterministicSchedule::wait(sem_t* sem) {
242   while (!tryWait(sem)) {
243     // we're not busy waiting because this is a deterministic schedule
244   }
245 }
246 }
247 }
248
249 namespace folly {
250 namespace detail {
251
252 using namespace test;
253 using namespace std::chrono;
254
255 template <>
256 FutexResult Futex<DeterministicAtomic>::futexWaitImpl(
257     uint32_t expected,
258     time_point<system_clock>* absSystemTimeout,
259     time_point<steady_clock>* absSteadyTimeout,
260     uint32_t waitMask) {
261   bool hasTimeout = absSystemTimeout != nullptr || absSteadyTimeout != nullptr;
262   bool awoken = false;
263   FutexResult result = FutexResult::AWOKEN;
264
265   DeterministicSchedule::beforeSharedAccess();
266   FOLLY_TEST_DSCHED_VLOG(this << ".futexWait(" << std::hex << expected
267                               << ", .., " << std::hex << waitMask
268                               << ") beginning..");
269   futexLock.lock();
270   if (data == expected) {
271     auto& queue = futexQueues[this];
272     queue.emplace_back(waitMask, &awoken);
273     auto ours = queue.end();
274     ours--;
275     while (!awoken) {
276       futexLock.unlock();
277       DeterministicSchedule::afterSharedAccess();
278       DeterministicSchedule::beforeSharedAccess();
279       futexLock.lock();
280
281       // Simulate spurious wake-ups, timeouts each time with
282       // a 10% probability if we haven't been woken up already
283       if (!awoken && hasTimeout &&
284           DeterministicSchedule::getRandNumber(100) < 10) {
285         assert(futexQueues.count(this) != 0 && &futexQueues[this] == &queue);
286         queue.erase(ours);
287         if (queue.empty()) {
288           futexQueues.erase(this);
289         }
290         // Simulate ETIMEDOUT 90% of the time and other failures
291         // remaining time
292         result = DeterministicSchedule::getRandNumber(100) >= 10
293                      ? FutexResult::TIMEDOUT
294                      : FutexResult::INTERRUPTED;
295         break;
296       }
297     }
298   } else {
299     result = FutexResult::VALUE_CHANGED;
300   }
301   futexLock.unlock();
302
303   char const* resultStr = "?";
304   switch (result) {
305     case FutexResult::AWOKEN:
306       resultStr = "AWOKEN";
307       break;
308     case FutexResult::TIMEDOUT:
309       resultStr = "TIMEDOUT";
310       break;
311     case FutexResult::INTERRUPTED:
312       resultStr = "INTERRUPTED";
313       break;
314     case FutexResult::VALUE_CHANGED:
315       resultStr = "VALUE_CHANGED";
316       break;
317   }
318   FOLLY_TEST_DSCHED_VLOG(this << ".futexWait(" << std::hex << expected
319                               << ", .., " << std::hex << waitMask << ") -> "
320                               << resultStr);
321   DeterministicSchedule::afterSharedAccess();
322   return result;
323 }
324
325 template <>
326 int Futex<DeterministicAtomic>::futexWake(int count, uint32_t wakeMask) {
327   int rv = 0;
328   DeterministicSchedule::beforeSharedAccess();
329   futexLock.lock();
330   if (futexQueues.count(this) > 0) {
331     auto& queue = futexQueues[this];
332     auto iter = queue.begin();
333     while (iter != queue.end() && rv < count) {
334       auto cur = iter++;
335       if ((cur->first & wakeMask) != 0) {
336         *(cur->second) = true;
337         rv++;
338         queue.erase(cur);
339       }
340     }
341     if (queue.empty()) {
342       futexQueues.erase(this);
343     }
344   }
345   futexLock.unlock();
346   FOLLY_TEST_DSCHED_VLOG(this << ".futexWake(" << count << ", " << std::hex
347                               << wakeMask << ") -> " << rv);
348   DeterministicSchedule::afterSharedAccess();
349   return rv;
350 }
351
352 template <>
353 CacheLocality const& CacheLocality::system<test::DeterministicAtomic>() {
354   static CacheLocality cache(CacheLocality::uniform(16));
355   return cache;
356 }
357
358 template <>
359 Getcpu::Func AccessSpreader<test::DeterministicAtomic>::pickGetcpuFunc() {
360   return &DeterministicSchedule::getcpu;
361 }
362 }
363 }