Multi-producer multi-consumer queue with optional blocking
[folly.git] / folly / test / DeterministicSchedule.h
1 /*
2  * Copyright 2013 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <atomic>
20 #include <functional>
21 #include <thread>
22 #include <unordered_set>
23 #include <vector>
24 #include <boost/noncopyable.hpp>
25 #include <semaphore.h>
26 #include <errno.h>
27 #include <assert.h>
28
29 #include <folly/ScopeGuard.h>
30 #include <folly/detail/Futex.h>
31
32 namespace folly { namespace test {
33
34 /**
35  * DeterministicSchedule coordinates the inter-thread communication of a
36  * set of threads under test, so that despite concurrency the execution is
37  * the same every time.  It works by stashing a reference to the schedule
38  * in a thread-local variable, then blocking all but one thread at a time.
39  *
40  * In order for DeterministicSchedule to work, it needs to intercept
41  * all inter-thread communication.  To do this you should use
42  * DeterministicAtomic<T> instead of std::atomic<T>, create threads
43  * using DeterministicSchedule::thread() instead of the std::thread
44  * constructor, DeterministicSchedule::join(thr) instead of thr.join(),
45  * and access semaphores via the helper functions in DeterministicSchedule.
46  * Locks are not yet supported, although they would be easy to add with
47  * the same strategy as the mapping of sem_wait.
48  *
49  * The actual schedule is defined by a function from n -> [0,n). At
50  * each step, the function will be given the number of active threads
51  * (n), and it returns the index of the thread that should be run next.
52  * Invocations of the scheduler function will be serialized, but will
53  * occur from multiple threads.  A good starting schedule is uniform(0).
54  */
55 class DeterministicSchedule : boost::noncopyable {
56  public:
57   /**
58    * Arranges for the current thread (and all threads created by
59    * DeterministicSchedule::thread on a thread participating in this
60    * schedule) to participate in a deterministic schedule.
61    */
62   explicit DeterministicSchedule(const std::function<int(int)>& scheduler);
63
64   /** Completes the schedule. */
65   ~DeterministicSchedule();
66
67   /**
68    * Returns a scheduling function that randomly chooses one of the
69    * runnable threads at each step, with no history.  This implements
70    * a schedule that is equivalent to one in which the steps between
71    * inter-thread communication are random variables following a poisson
72    * distribution.
73    */
74   static std::function<int(int)> uniform(long seed);
75
76   /**
77    * Returns a scheduling function that chooses a subset of the active
78    * threads and randomly chooses a member of the subset as the next
79    * runnable thread.  The subset is chosen with size n, and the choice
80    * is made every m steps.
81    */
82   static std::function<int(int)> uniformSubset(long seed, int n = 2,
83                                                int m = 64);
84
85   /** Obtains permission for the current thread to perform inter-thread
86    *  communication. */
87   static void beforeSharedAccess();
88
89   /** Releases permission for the current thread to perform inter-thread
90    *  communication. */
91   static void afterSharedAccess();
92
93   /** Launches a thread that will participate in the same deterministic
94    *  schedule as the current thread. */
95   template <typename Func, typename... Args>
96   static inline std::thread thread(Func&& func, Args&&... args) {
97     // TODO: maybe future versions of gcc will allow forwarding to thread
98     auto sched = tls_sched;
99     auto sem = sched ? sched->beforeThreadCreate() : nullptr;
100     auto child = std::thread([=](Args... a) {
101       if (sched) sched->afterThreadCreate(sem);
102       SCOPE_EXIT { if (sched) sched->beforeThreadExit(); };
103       func(a...);
104     }, args...);
105     if (sched) {
106       beforeSharedAccess();
107       sched->active_.insert(child.get_id());
108       afterSharedAccess();
109     }
110     return child;
111   }
112
113   /** Calls child.join() as part of a deterministic schedule. */
114   static void join(std::thread& child);
115
116   /** Calls sem_post(sem) as part of a deterministic schedule. */
117   static void post(sem_t* sem);
118
119   /** Calls sem_trywait(sem) as part of a deterministic schedule, returning
120    *  true on success and false on transient failure. */
121   static bool tryWait(sem_t* sem);
122
123   /** Calls sem_wait(sem) as part of a deterministic schedule. */
124   static void wait(sem_t* sem);
125
126  private:
127   static __thread sem_t* tls_sem;
128   static __thread DeterministicSchedule* tls_sched;
129
130   std::function<int(int)> scheduler_;
131   std::vector<sem_t*> sems_;
132   std::unordered_set<std::thread::id> active_;
133
134   sem_t* beforeThreadCreate();
135   void afterThreadCreate(sem_t*);
136   void beforeThreadExit();
137 };
138
139
140 /**
141  * DeterministicAtomic<T> is a drop-in replacement std::atomic<T> that
142  * cooperates with DeterministicSchedule.
143  */
144 template <typename T>
145 struct DeterministicAtomic {
146   std::atomic<T> data;
147
148   DeterministicAtomic() = default;
149   ~DeterministicAtomic() = default;
150   DeterministicAtomic(DeterministicAtomic<T> const &) = delete;
151   DeterministicAtomic<T>& operator= (DeterministicAtomic<T> const &) = delete;
152
153   constexpr /* implicit */ DeterministicAtomic(T v) noexcept : data(v) {}
154
155   bool is_lock_free() const noexcept {
156     return data.is_lock_free();
157   }
158
159   bool compare_exchange_strong(
160           T& v0, T v1,
161           std::memory_order mo = std::memory_order_seq_cst) noexcept {
162     DeterministicSchedule::beforeSharedAccess();
163     bool rv = data.compare_exchange_strong(v0, v1, mo);
164     DeterministicSchedule::afterSharedAccess();
165     return rv;
166   }
167
168   bool compare_exchange_weak(
169           T& v0, T v1,
170           std::memory_order mo = std::memory_order_seq_cst) noexcept {
171     DeterministicSchedule::beforeSharedAccess();
172     bool rv = data.compare_exchange_weak(v0, v1, mo);
173     DeterministicSchedule::afterSharedAccess();
174     return rv;
175   }
176
177   T exchange(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
178     DeterministicSchedule::beforeSharedAccess();
179     T rv = data.exchange(v, mo);
180     DeterministicSchedule::afterSharedAccess();
181     return rv;
182   }
183
184   /* implicit */ operator T () const noexcept {
185     DeterministicSchedule::beforeSharedAccess();
186     T rv = data;
187     DeterministicSchedule::afterSharedAccess();
188     return rv;
189   }
190
191   T load(std::memory_order mo = std::memory_order_seq_cst) const noexcept {
192     DeterministicSchedule::beforeSharedAccess();
193     T rv = data.load(mo);
194     DeterministicSchedule::afterSharedAccess();
195     return rv;
196   }
197
198   T operator= (T v) noexcept {
199     DeterministicSchedule::beforeSharedAccess();
200     T rv = (data = v);
201     DeterministicSchedule::afterSharedAccess();
202     return rv;
203   }
204
205   void store(T v, std::memory_order mo = std::memory_order_seq_cst) noexcept {
206     DeterministicSchedule::beforeSharedAccess();
207     data.store(v, mo);
208     DeterministicSchedule::afterSharedAccess();
209   }
210
211   T operator++ () noexcept {
212     DeterministicSchedule::beforeSharedAccess();
213     T rv = ++data;
214     DeterministicSchedule::afterSharedAccess();
215     return rv;
216   }
217
218   T operator++ (int postDummy) noexcept {
219     DeterministicSchedule::beforeSharedAccess();
220     T rv = data++;
221     DeterministicSchedule::afterSharedAccess();
222     return rv;
223   }
224
225   T operator-- () noexcept {
226     DeterministicSchedule::beforeSharedAccess();
227     T rv = --data;
228     DeterministicSchedule::afterSharedAccess();
229     return rv;
230   }
231
232   T operator-- (int postDummy) noexcept {
233     DeterministicSchedule::beforeSharedAccess();
234     T rv = data--;
235     DeterministicSchedule::afterSharedAccess();
236     return rv;
237   }
238
239   T operator+= (T v) noexcept {
240     DeterministicSchedule::beforeSharedAccess();
241     T rv = (data += v);
242     DeterministicSchedule::afterSharedAccess();
243     return rv;
244   }
245
246   T operator-= (T v) noexcept {
247     DeterministicSchedule::beforeSharedAccess();
248     T rv = (data -= v);
249     DeterministicSchedule::afterSharedAccess();
250     return rv;
251   }
252
253   T operator&= (T v) noexcept {
254     DeterministicSchedule::beforeSharedAccess();
255     T rv = (data &= v);
256     DeterministicSchedule::afterSharedAccess();
257     return rv;
258   }
259
260   T operator|= (T v) noexcept {
261     DeterministicSchedule::beforeSharedAccess();
262     T rv = (data |= v);
263     DeterministicSchedule::afterSharedAccess();
264     return rv;
265   }
266 };
267
268 }}
269
270 namespace folly { namespace detail {
271
272 template<>
273 bool Futex<test::DeterministicAtomic>::futexWait(uint32_t expected,
274                                                  uint32_t waitMask);
275
276 template<>
277 int Futex<test::DeterministicAtomic>::futexWake(int count, uint32_t wakeMask);
278
279 }}