2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/test/DeterministicSchedule.h>
23 #include <unordered_map>
29 FOLLY_TLS sem_t* DeterministicSchedule::tls_sem;
30 FOLLY_TLS DeterministicSchedule* DeterministicSchedule::tls_sched;
31 FOLLY_TLS unsigned DeterministicSchedule::tls_threadId;
33 // access is protected by futexLock
34 static std::unordered_map<detail::Futex<DeterministicAtomic>*,
35 std::list<std::pair<uint32_t, bool*>>> futexQueues;
37 static std::mutex futexLock;
39 DeterministicSchedule::DeterministicSchedule(
40 const std::function<int(int)>& scheduler)
41 : scheduler_(scheduler), nextThreadId_(1) {
42 assert(tls_sem == nullptr);
43 assert(tls_sched == nullptr);
46 sem_init(tls_sem, 0, 1);
47 sems_.push_back(tls_sem);
52 DeterministicSchedule::~DeterministicSchedule() {
53 assert(tls_sched == this);
54 assert(sems_.size() == 1);
55 assert(sems_[0] == tls_sem);
59 std::function<int(int)> DeterministicSchedule::uniform(long seed) {
60 auto rand = std::make_shared<std::ranlux48>(seed);
61 return [rand](size_t numActive) {
62 auto dist = std::uniform_int_distribution<int>(0, numActive - 1);
67 struct UniformSubset {
68 UniformSubset(long seed, int subsetSize, int stepsBetweenSelect)
69 : uniform_(DeterministicSchedule::uniform(seed)),
70 subsetSize_(subsetSize),
71 stepsBetweenSelect_(stepsBetweenSelect),
74 size_t operator()(size_t numActive) {
75 adjustPermSize(numActive);
76 if (stepsLeft_-- == 0) {
77 stepsLeft_ = stepsBetweenSelect_ - 1;
80 return perm_[uniform_(std::min(numActive, subsetSize_))];
84 std::function<int(int)> uniform_;
85 const size_t subsetSize_;
86 const int stepsBetweenSelect_;
89 // only the first subsetSize_ is properly randomized
90 std::vector<int> perm_;
92 void adjustPermSize(size_t numActive) {
93 if (perm_.size() > numActive) {
94 perm_.erase(std::remove_if(perm_.begin(),
96 [=](size_t x) { return x >= numActive; }),
99 while (perm_.size() < numActive) {
100 perm_.push_back(perm_.size());
103 assert(perm_.size() == numActive);
106 void shufflePrefix() {
107 for (size_t i = 0; i < std::min(perm_.size() - 1, subsetSize_); ++i) {
108 int j = uniform_(perm_.size() - i) + i;
109 std::swap(perm_[i], perm_[j]);
114 std::function<int(int)> DeterministicSchedule::uniformSubset(long seed,
117 auto gen = std::make_shared<UniformSubset>(seed, n, m);
118 return [=](size_t numActive) { return (*gen)(numActive); };
121 void DeterministicSchedule::beforeSharedAccess() {
127 void DeterministicSchedule::afterSharedAccess() {
128 auto sched = tls_sched;
133 sem_post(sched->sems_[sched->scheduler_(sched->sems_.size())]);
136 int DeterministicSchedule::getRandNumber(int n) {
138 return tls_sched->scheduler_(n);
140 return std::rand() % n;
143 int DeterministicSchedule::getcpu(unsigned* cpu,
145 void* /* unused */) {
146 if (!tls_threadId && tls_sched) {
147 beforeSharedAccess();
148 tls_threadId = tls_sched->nextThreadId_++;
155 *node = tls_threadId;
160 sem_t* DeterministicSchedule::beforeThreadCreate() {
161 sem_t* s = new sem_t;
163 beforeSharedAccess();
169 void DeterministicSchedule::afterThreadCreate(sem_t* sem) {
170 assert(tls_sem == nullptr);
171 assert(tls_sched == nullptr);
174 bool started = false;
176 beforeSharedAccess();
177 if (active_.count(std::this_thread::get_id()) == 1) {
184 void DeterministicSchedule::beforeThreadExit() {
185 assert(tls_sched == this);
186 beforeSharedAccess();
187 sems_.erase(std::find(sems_.begin(), sems_.end(), tls_sem));
188 active_.erase(std::this_thread::get_id());
189 if (sems_.size() > 0) {
190 FOLLY_TEST_DSCHED_VLOG("exiting");
193 sem_destroy(tls_sem);
199 void DeterministicSchedule::join(std::thread& child) {
200 auto sched = tls_sched;
204 beforeSharedAccess();
205 done = !sched->active_.count(child.get_id());
207 FOLLY_TEST_DSCHED_VLOG("joined " << std::hex << child.get_id());
215 void DeterministicSchedule::post(sem_t* sem) {
216 beforeSharedAccess();
218 FOLLY_TEST_DSCHED_VLOG("sem_post(" << sem << ")");
222 bool DeterministicSchedule::tryWait(sem_t* sem) {
223 beforeSharedAccess();
224 int rv = sem_trywait(sem);
225 int e = rv == 0 ? 0 : errno;
226 FOLLY_TEST_DSCHED_VLOG("sem_trywait(" << sem << ") = " << rv
237 void DeterministicSchedule::wait(sem_t* sem) {
238 while (!tryWait(sem)) {
239 // we're not busy waiting because this is a deterministic schedule
248 using namespace test;
249 using namespace std::chrono;
252 FutexResult Futex<DeterministicAtomic>::futexWaitImpl(
254 time_point<system_clock>* absSystemTimeout,
255 time_point<steady_clock>* absSteadyTimeout,
257 bool hasTimeout = absSystemTimeout != nullptr || absSteadyTimeout != nullptr;
259 FutexResult result = FutexResult::AWOKEN;
261 DeterministicSchedule::beforeSharedAccess();
262 FOLLY_TEST_DSCHED_VLOG(this << ".futexWait(" << std::hex << expected
263 << ", .., " << std::hex << waitMask
266 if (data == expected) {
267 auto& queue = futexQueues[this];
268 queue.emplace_back(waitMask, &awoken);
269 auto ours = queue.end();
273 DeterministicSchedule::afterSharedAccess();
274 DeterministicSchedule::beforeSharedAccess();
277 // Simulate spurious wake-ups, timeouts each time with
278 // a 10% probability if we haven't been woken up already
279 if (!awoken && hasTimeout &&
280 DeterministicSchedule::getRandNumber(100) < 10) {
281 assert(futexQueues.count(this) != 0 && &futexQueues[this] == &queue);
284 futexQueues.erase(this);
286 // Simulate ETIMEDOUT 90% of the time and other failures
288 result = DeterministicSchedule::getRandNumber(100) >= 10
289 ? FutexResult::TIMEDOUT
290 : FutexResult::INTERRUPTED;
295 result = FutexResult::VALUE_CHANGED;
299 char const* resultStr = "?";
301 case FutexResult::AWOKEN:
302 resultStr = "AWOKEN";
304 case FutexResult::TIMEDOUT:
305 resultStr = "TIMEDOUT";
307 case FutexResult::INTERRUPTED:
308 resultStr = "INTERRUPTED";
310 case FutexResult::VALUE_CHANGED:
311 resultStr = "VALUE_CHANGED";
314 FOLLY_TEST_DSCHED_VLOG(this << ".futexWait(" << std::hex << expected
315 << ", .., " << std::hex << waitMask << ") -> "
317 DeterministicSchedule::afterSharedAccess();
322 int Futex<DeterministicAtomic>::futexWake(int count, uint32_t wakeMask) {
324 DeterministicSchedule::beforeSharedAccess();
326 if (futexQueues.count(this) > 0) {
327 auto& queue = futexQueues[this];
328 auto iter = queue.begin();
329 while (iter != queue.end() && rv < count) {
331 if ((cur->first & wakeMask) != 0) {
332 *(cur->second) = true;
338 futexQueues.erase(this);
342 FOLLY_TEST_DSCHED_VLOG(this << ".futexWake(" << count << ", " << std::hex
343 << wakeMask << ") -> " << rv);
344 DeterministicSchedule::afterSharedAccess();
349 CacheLocality const& CacheLocality::system<test::DeterministicAtomic>() {
350 static CacheLocality cache(CacheLocality::uniform(16));
355 const AccessSpreader<test::DeterministicAtomic>
356 AccessSpreader<test::DeterministicAtomic>::stripeByCore(
357 CacheLocality::system<>().numCachesByLevel.front());
360 const AccessSpreader<test::DeterministicAtomic>
361 AccessSpreader<test::DeterministicAtomic>::stripeByChip(
362 CacheLocality::system<>().numCachesByLevel.back());
365 AccessSpreaderArray<test::DeterministicAtomic, 128>
366 AccessSpreaderArray<test::DeterministicAtomic, 128>::sharedInstance = {};
369 Getcpu::Func AccessSpreader<test::DeterministicAtomic>::pickGetcpuFunc(
370 size_t /* numStripes */) {
371 return &DeterministicSchedule::getcpu;