Revert "Fix folly::ThreadLocal to work in a shared library"
[folly.git] / folly / test / ReadMostlySharedPtrTest.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 /* -*- Mode: C++; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
17
18 #include <atomic>
19 #include <thread>
20 #include <mutex>
21 #include <folly/Memory.h>
22 #include <condition_variable>
23 #include <gtest/gtest.h>
24
25 #include <folly/ReadMostlySharedPtr.h>
26
27 using folly::ReadMostlySharedPtr;
28
29 // send SIGALRM to test process after this many seconds
30 const unsigned int TEST_TIMEOUT = 10;
31
32 class ReadMostlySharedPtrTest : public ::testing::Test {
33  public:
34   ReadMostlySharedPtrTest() {
35     alarm(TEST_TIMEOUT);
36   }
37 };
38
39 struct TestObject {
40   int value;
41   std::atomic<int>& counter;
42
43   TestObject(int value, std::atomic<int>& counter)
44       : value(value), counter(counter) {
45     ++counter;
46   }
47
48   ~TestObject() {
49     assert(counter.load() > 0);
50     --counter;
51   }
52 };
53
54 // One side calls requestAndWait(), the other side calls waitForRequest(),
55 // does something and calls completed().
56 class Coordinator {
57  public:
58   void requestAndWait() {
59     {
60       std::lock_guard<std::mutex> lock(mutex);
61       assert(!is_requested);
62       assert(!is_completed);
63       is_requested = true;
64     }
65     cv.notify_all();
66     {
67       std::unique_lock<std::mutex> lock(mutex);
68       cv.wait(lock, [&] { return is_completed; });
69     }
70   }
71
72   void waitForRequest() {
73     std::unique_lock<std::mutex> lock(mutex);
74     assert(!is_completed);
75     cv.wait(lock, [&] { return is_requested; });
76   }
77
78   void completed() {
79     {
80       std::lock_guard<std::mutex> lock(mutex);
81       assert(is_requested);
82       is_completed = true;
83     }
84     cv.notify_all();
85   }
86
87  private:
88   bool is_requested = false;
89   bool is_completed = false;
90   std::condition_variable cv;
91   std::mutex mutex;
92 };
93
94 TEST_F(ReadMostlySharedPtrTest, BasicStores) {
95   ReadMostlySharedPtr<TestObject> ptr;
96
97   // Store 1.
98   std::atomic<int> cnt1{0};
99   ptr.store(folly::make_unique<TestObject>(1, cnt1));
100   EXPECT_EQ(1, cnt1.load());
101
102   // Store 2, check that 1 is destroyed.
103   std::atomic<int> cnt2{0};
104   ptr.store(folly::make_unique<TestObject>(2, cnt2));
105   EXPECT_EQ(1, cnt2.load());
106   EXPECT_EQ(0, cnt1.load());
107
108   // Store nullptr, check that 2 is destroyed.
109   ptr.store(nullptr);
110   EXPECT_EQ(0, cnt2.load());
111 }
112
113 TEST_F(ReadMostlySharedPtrTest, BasicLoads) {
114   std::atomic<int> cnt2{0};
115   ReadMostlySharedPtr<TestObject>::ReadPtr x;
116
117   {
118     ReadMostlySharedPtr<TestObject> ptr;
119
120     // Check that ptr is initially nullptr.
121     EXPECT_EQ(ptr.load(), nullptr);
122
123     std::atomic<int> cnt1{0};
124     ptr.store(folly::make_unique<TestObject>(1, cnt1));
125     EXPECT_EQ(1, cnt1.load());
126
127     x = ptr.load();
128     EXPECT_EQ(1, x->value);
129
130     ptr.store(folly::make_unique<TestObject>(2, cnt2));
131     EXPECT_EQ(1, cnt2.load());
132     EXPECT_EQ(1, cnt1.load());
133
134     x = ptr.load();
135     EXPECT_EQ(2, x->value);
136     EXPECT_EQ(0, cnt1.load());
137
138     ptr.store(nullptr);
139     EXPECT_EQ(1, cnt2.load());
140   }
141
142   EXPECT_EQ(1, cnt2.load());
143
144   x.reset();
145   EXPECT_EQ(0, cnt2.load());
146 }
147
148 TEST_F(ReadMostlySharedPtrTest, LoadsFromThreads) {
149   std::atomic<int> cnt{0};
150
151   {
152     ReadMostlySharedPtr<TestObject> ptr;
153     Coordinator loads[7];
154
155     std::thread t1([&] {
156       loads[0].waitForRequest();
157       EXPECT_EQ(ptr.load(), nullptr);
158       loads[0].completed();
159
160       loads[3].waitForRequest();
161       EXPECT_EQ(2, ptr.load()->value);
162       loads[3].completed();
163
164       loads[4].waitForRequest();
165       EXPECT_EQ(4, ptr.load()->value);
166       loads[4].completed();
167
168       loads[5].waitForRequest();
169       EXPECT_EQ(5, ptr.load()->value);
170       loads[5].completed();
171     });
172
173     std::thread t2([&] {
174       loads[1].waitForRequest();
175       EXPECT_EQ(1, ptr.load()->value);
176       loads[1].completed();
177
178       loads[2].waitForRequest();
179       EXPECT_EQ(2, ptr.load()->value);
180       loads[2].completed();
181
182       loads[6].waitForRequest();
183       EXPECT_EQ(5, ptr.load()->value);
184       loads[6].completed();
185     });
186
187     loads[0].requestAndWait();
188
189     ptr.store(folly::make_unique<TestObject>(1, cnt));
190     loads[1].requestAndWait();
191
192     ptr.store(folly::make_unique<TestObject>(2, cnt));
193     loads[2].requestAndWait();
194     loads[3].requestAndWait();
195
196     ptr.store(folly::make_unique<TestObject>(3, cnt));
197     ptr.store(folly::make_unique<TestObject>(4, cnt));
198     loads[4].requestAndWait();
199
200     ptr.store(folly::make_unique<TestObject>(5, cnt));
201     loads[5].requestAndWait();
202     loads[6].requestAndWait();
203
204     EXPECT_EQ(1, cnt.load());
205
206     t1.join();
207     t2.join();
208   }
209
210   EXPECT_EQ(0, cnt.load());
211 }
212
213 TEST_F(ReadMostlySharedPtrTest, Ctor) {
214   std::atomic<int> cnt1{0};
215   {
216     ReadMostlySharedPtr<TestObject> ptr(
217       folly::make_unique<TestObject>(1, cnt1));
218
219     EXPECT_EQ(1, ptr.load()->value);
220   }
221
222   EXPECT_EQ(0, cnt1.load());
223 }
224
225 TEST_F(ReadMostlySharedPtrTest, ClearingCache) {
226   ReadMostlySharedPtr<TestObject> ptr;
227
228   // Store 1.
229   std::atomic<int> cnt1{0};
230   ptr.store(folly::make_unique<TestObject>(1, cnt1));
231
232   Coordinator c;
233
234   std::thread t([&] {
235     // Cache the pointer for this thread.
236     ptr.load();
237     c.requestAndWait();
238   });
239
240   // Wait for the thread to cache pointer.
241   c.waitForRequest();
242   EXPECT_EQ(1, cnt1.load());
243
244   // Store 2 and check that 1 is destroyed.
245   std::atomic<int> cnt2{0};
246   ptr.store(folly::make_unique<TestObject>(2, cnt2));
247   EXPECT_EQ(0, cnt1.load());
248
249   // Unblock thread.
250   c.completed();
251   t.join();
252 }
253
254 TEST_F(ReadMostlySharedPtrTest, SlowDestructor) {
255   struct Thingy {
256     Coordinator* dtor;
257
258     Thingy(Coordinator* dtor = nullptr) : dtor(dtor) {}
259
260     ~Thingy() {
261       if (dtor) {
262         dtor->requestAndWait();
263       }
264     }
265   };
266
267   Coordinator dtor;
268
269   ReadMostlySharedPtr<Thingy> ptr;
270   ptr.store(folly::make_unique<Thingy>(&dtor));
271
272   std::thread t([&] {
273     // This will block in ~Thingy().
274     ptr.store(folly::make_unique<Thingy>());
275   });
276
277   // Wait until store() in thread calls ~T().
278   dtor.waitForRequest();
279   // Do a store while another store() is stuck in ~T().
280   ptr.store(folly::make_unique<Thingy>());
281   // Let the other store() go.
282   dtor.completed();
283
284   t.join();
285 }
286
287 TEST_F(ReadMostlySharedPtrTest, StressTest) {
288   const int ptr_count = 2;
289   const int thread_count = 5;
290   const std::chrono::milliseconds duration(100);
291   const std::chrono::milliseconds upd_delay(1);
292   const std::chrono::milliseconds respawn_delay(1);
293
294   struct Instance {
295     std::atomic<int> value{0};
296     std::atomic<int> prev_value{0};
297     ReadMostlySharedPtr<TestObject> ptr;
298   };
299
300   struct Thread {
301     std::thread t;
302     std::atomic<bool> shutdown{false};
303   };
304
305   std::atomic<int> counter(0);
306   std::vector<Instance> instances(ptr_count);
307   std::vector<Thread> threads(thread_count);
308   std::atomic<int> seed(0);
309
310   // Threads that call load() and checking value.
311   auto thread_func = [&](int t) {
312     pthread_setname_np(pthread_self(),
313                        ("load" + folly::to<std::string>(t)).c_str());
314     std::mt19937 rnd(++seed);
315     while (!threads[t].shutdown.load()) {
316       Instance& instance = instances[rnd() % instances.size()];
317       int val1 = instance.prev_value.load();
318       auto p = instance.ptr.load();
319       int val = p ? p->value : 0;
320       int val2 = instance.value.load();
321       EXPECT_LE(val1, val);
322       EXPECT_LE(val, val2);
323     }
324   };
325
326   for (size_t t = 0; t < threads.size(); ++t) {
327     threads[t].t = std::thread(thread_func, t);
328   }
329
330   std::atomic<bool> shutdown(false);
331
332   // Thread that calls store() occasionally.
333   std::thread update_thread([&] {
334     pthread_setname_np(pthread_self(), "store");
335     std::mt19937 rnd(++seed);
336     while (!shutdown.load()) {
337       Instance& instance = instances[rnd() % instances.size()];
338       int val = ++instance.value;
339       instance.ptr.store(folly::make_unique<TestObject>(val, counter));
340       ++instance.prev_value;
341       /* sleep override */
342       std::this_thread::sleep_for(upd_delay);
343     }
344   });
345
346   // Thread that joins and spawns load() threads occasionally.
347   std::thread respawn_thread([&] {
348     pthread_setname_np(pthread_self(), "respawn");
349     std::mt19937 rnd(++seed);
350     while (!shutdown.load()) {
351       int t = rnd() % threads.size();
352       threads[t].shutdown.store(true);
353       threads[t].t.join();
354       threads[t].shutdown.store(false);
355       threads[t].t = std::thread(thread_func, t);
356
357       /* sleep override */
358       std::this_thread::sleep_for(respawn_delay);
359     }
360   });
361
362   // Let all of this run for some time.
363   /* sleep override */
364   std::this_thread::sleep_for(duration);
365
366   // Shut all of this down.
367   shutdown.store(true);
368
369   update_thread.join();
370   respawn_thread.join();
371   for (auto& t: threads) {
372     t.shutdown.store(true);
373     t.t.join();
374   }
375
376   for (auto& instance: instances) {
377     instance.ptr.store(nullptr);
378     EXPECT_EQ(instance.value.load(), instance.prev_value.load());
379   }
380
381   EXPECT_EQ(0, counter.load());
382 }