detail/CacheLocality.h - utilities for dynamic cache optimizations
[folly.git] / folly / test / DeterministicSchedule.cpp
1 /*
2  * Copyright 2013 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include "DeterministicSchedule.h"
18 #include <algorithm>
19 #include <list>
20 #include <mutex>
21 #include <random>
22 #include <utility>
23 #include <unordered_map>
24 #include <assert.h>
25
26 namespace folly { namespace test {
27
28 __thread sem_t* DeterministicSchedule::tls_sem;
29 __thread DeterministicSchedule* DeterministicSchedule::tls_sched;
30
31 // access is protected by futexLock
32 static std::unordered_map<detail::Futex<DeterministicAtomic>*,
33                           std::list<std::pair<uint32_t,bool*>>> futexQueues;
34
35 static std::mutex futexLock;
36
37 DeterministicSchedule::DeterministicSchedule(
38         const std::function<int(int)>& scheduler)
39   : scheduler_(scheduler)
40 {
41   assert(tls_sem == nullptr);
42   assert(tls_sched == nullptr);
43
44   tls_sem = new sem_t;
45   sem_init(tls_sem, 0, 1);
46   sems_.push_back(tls_sem);
47
48   tls_sched = this;
49 }
50
51 DeterministicSchedule::~DeterministicSchedule() {
52   assert(tls_sched == this);
53   assert(sems_.size() == 1);
54   assert(sems_[0] == tls_sem);
55   beforeThreadExit();
56 }
57
58 std::function<int(int)>
59 DeterministicSchedule::uniform(long seed) {
60   auto rand = std::make_shared<std::ranlux48>(seed);
61   return [rand](int numActive) {
62     auto dist = std::uniform_int_distribution<int>(0, numActive - 1);
63     return dist(*rand);
64   };
65 }
66
67 struct UniformSubset {
68   UniformSubset(long seed, int subsetSize, int stepsBetweenSelect)
69     : uniform_(DeterministicSchedule::uniform(seed))
70     , subsetSize_(subsetSize)
71     , stepsBetweenSelect_(stepsBetweenSelect)
72     , stepsLeft_(0)
73   {
74   }
75
76   int operator()(int numActive) {
77     adjustPermSize(numActive);
78     if (stepsLeft_-- == 0) {
79       stepsLeft_ = stepsBetweenSelect_ - 1;
80       shufflePrefix();
81     }
82     return perm_[uniform_(std::min(numActive, subsetSize_))];
83   }
84
85  private:
86   std::function<int(int)> uniform_;
87   const int subsetSize_;
88   const int stepsBetweenSelect_;
89
90   int stepsLeft_;
91   // only the first subsetSize_ is properly randomized
92   std::vector<int> perm_;
93
94   void adjustPermSize(int numActive) {
95     if (perm_.size() > numActive) {
96       perm_.erase(std::remove_if(perm_.begin(), perm_.end(),
97               [=](int x){ return x >= numActive; }), perm_.end());
98     } else {
99       while (perm_.size() < numActive) {
100         perm_.push_back(perm_.size());
101       }
102     }
103     assert(perm_.size() == numActive);
104   }
105
106   void shufflePrefix() {
107     for (int i = 0; i < std::min(int(perm_.size() - 1), subsetSize_); ++i) {
108       int j = uniform_(perm_.size() - i) + i;
109       std::swap(perm_[i], perm_[j]);
110     }
111   }
112 };
113
114 std::function<int(int)>
115 DeterministicSchedule::uniformSubset(long seed, int n, int m) {
116   auto gen = std::make_shared<UniformSubset>(seed, n, m);
117   return [=](int numActive) { return (*gen)(numActive); };
118 }
119
120 void
121 DeterministicSchedule::beforeSharedAccess() {
122   if (tls_sem) {
123     sem_wait(tls_sem);
124   }
125 }
126
127 void
128 DeterministicSchedule::afterSharedAccess() {
129   auto sched = tls_sched;
130   if (!sched) {
131     return;
132   }
133
134   sem_post(sched->sems_[sched->scheduler_(sched->sems_.size())]);
135 }
136
137 sem_t*
138 DeterministicSchedule::beforeThreadCreate() {
139   sem_t* s = new sem_t;
140   sem_init(s, 0, 0);
141   beforeSharedAccess();
142   sems_.push_back(s);
143   afterSharedAccess();
144   return s;
145 }
146
147 void
148 DeterministicSchedule::afterThreadCreate(sem_t* sem) {
149   assert(tls_sem == nullptr);
150   assert(tls_sched == nullptr);
151   tls_sem = sem;
152   tls_sched = this;
153   bool started = false;
154   while (!started) {
155     beforeSharedAccess();
156     if (active_.count(std::this_thread::get_id()) == 1) {
157       started = true;
158     }
159     afterSharedAccess();
160   }
161 }
162
163 void
164 DeterministicSchedule::beforeThreadExit() {
165   assert(tls_sched == this);
166   beforeSharedAccess();
167   sems_.erase(std::find(sems_.begin(), sems_.end(), tls_sem));
168   active_.erase(std::this_thread::get_id());
169   if (sems_.size() > 0) {
170     afterSharedAccess();
171   }
172   sem_destroy(tls_sem);
173   delete tls_sem;
174   tls_sem = nullptr;
175   tls_sched = nullptr;
176 }
177
178 void
179 DeterministicSchedule::join(std::thread& child) {
180   auto sched = tls_sched;
181   if (sched) {
182     bool done = false;
183     while (!done) {
184       beforeSharedAccess();
185       done = !sched->active_.count(child.get_id());
186       afterSharedAccess();
187     }
188   }
189   child.join();
190 }
191
192 void
193 DeterministicSchedule::post(sem_t* sem) {
194   beforeSharedAccess();
195   sem_post(sem);
196   afterSharedAccess();
197 }
198
199 bool
200 DeterministicSchedule::tryWait(sem_t* sem) {
201   beforeSharedAccess();
202   int rv = sem_trywait(sem);
203   afterSharedAccess();
204   if (rv == 0) {
205     return true;
206   } else {
207     assert(errno == EAGAIN);
208     return false;
209   }
210 }
211
212 void
213 DeterministicSchedule::wait(sem_t* sem) {
214   while (!tryWait(sem)) {
215     // we're not busy waiting because this is a deterministic schedule
216   }
217 }
218
219 }}
220
221 namespace folly { namespace detail {
222
223 using namespace test;
224
225 template<>
226 bool Futex<DeterministicAtomic>::futexWait(uint32_t expected,
227                                            uint32_t waitMask) {
228   bool rv;
229   DeterministicSchedule::beforeSharedAccess();
230   futexLock.lock();
231   if (data != expected) {
232     rv = false;
233   } else {
234     auto& queue = futexQueues[this];
235     bool done = false;
236     queue.push_back(std::make_pair(waitMask, &done));
237     while (!done) {
238       futexLock.unlock();
239       DeterministicSchedule::afterSharedAccess();
240       DeterministicSchedule::beforeSharedAccess();
241       futexLock.lock();
242     }
243     rv = true;
244   }
245   futexLock.unlock();
246   DeterministicSchedule::afterSharedAccess();
247   return rv;
248 }
249
250 template<>
251 int Futex<DeterministicAtomic>::futexWake(int count, uint32_t wakeMask) {
252   int rv = 0;
253   DeterministicSchedule::beforeSharedAccess();
254   futexLock.lock();
255   if (futexQueues.count(this) > 0) {
256     auto& queue = futexQueues[this];
257     auto iter = queue.begin();
258     while (iter != queue.end() && rv < count) {
259       auto cur = iter++;
260       if ((cur->first & wakeMask) != 0) {
261         *(cur->second) = true;
262         rv++;
263         queue.erase(cur);
264       }
265     }
266     if (queue.empty()) {
267       futexQueues.erase(this);
268     }
269   }
270   futexLock.unlock();
271   DeterministicSchedule::afterSharedAccess();
272   return rv;
273 }
274
275
276 template<>
277 CacheLocality const& CacheLocality::system<test::DeterministicAtomic>() {
278   static CacheLocality cache(CacheLocality::uniform(16));
279   return cache;
280 }
281
282 template<>
283 test::DeterministicAtomic<size_t>
284     SequentialThreadId<test::DeterministicAtomic>::prevId(0);
285
286 template<>
287 __thread size_t SequentialThreadId<test::DeterministicAtomic>::currentId(0);
288
289 template<>
290 const AccessSpreader<test::DeterministicAtomic>
291 AccessSpreader<test::DeterministicAtomic>::stripeByCore(
292     CacheLocality::system<>().numCachesByLevel.front());
293
294 template<>
295 const AccessSpreader<test::DeterministicAtomic>
296 AccessSpreader<test::DeterministicAtomic>::stripeByChip(
297     CacheLocality::system<>().numCachesByLevel.back());
298
299 template<>
300 AccessSpreaderArray<test::DeterministicAtomic,128>
301 AccessSpreaderArray<test::DeterministicAtomic,128>::sharedInstance = {};
302
303
304 template<>
305 Getcpu::Func
306 AccessSpreader<test::DeterministicAtomic>::pickGetcpuFunc(size_t numStripes) {
307   return &SequentialThreadId<test::DeterministicAtomic>::getcpu;
308 }
309
310 }}