Fix copyright lines
[folly.git] / folly / test / ThreadCachedIntTest.cpp
1 /*
2  * Copyright 2011-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/ThreadCachedInt.h>
18
19 #include <atomic>
20 #include <condition_variable>
21 #include <memory>
22 #include <thread>
23
24 #include <glog/logging.h>
25
26 #include <folly/Benchmark.h>
27 #include <folly/hash/Hash.h>
28 #include <folly/portability/GFlags.h>
29 #include <folly/portability/GTest.h>
30 #include <folly/system/ThreadId.h>
31
32 using namespace folly;
33
34 using std::unique_ptr;
35 using std::vector;
36
37 using Counter = ThreadCachedInt<int64_t>;
38
39 class ThreadCachedIntTest : public testing::Test {
40  public:
41   uint32_t GetDeadThreadsTotal(const Counter& counter) {
42     return counter.readFast();
43   }
44 };
45
46 // Multithreaded tests.  Creates a specified number of threads each of
47 // which iterates a different amount and dies.
48
49 namespace {
50 // Set cacheSize to be large so cached data moves to target_ only when
51 // thread dies.
52 Counter g_counter_for_mt_slow(0, UINT32_MAX);
53 Counter g_counter_for_mt_fast(0, UINT32_MAX);
54
55 // Used to sync between threads.  The value of this variable is the
56 // maximum iteration index upto which Runner() is allowed to go.
57 uint32_t g_sync_for_mt(0);
58 std::condition_variable cv;
59 std::mutex cv_m;
60
61 // Performs the specified number of iterations.  Within each
62 // iteration, it increments counter 10 times.  At the beginning of
63 // each iteration it checks g_sync_for_mt to see if it can proceed,
64 // otherwise goes into a loop sleeping and rechecking.
65 void Runner(Counter* counter, uint32_t iterations) {
66   for (uint32_t i = 0; i < iterations; ++i) {
67     std::unique_lock<std::mutex> lk(cv_m);
68     cv.wait(lk, [i] { return i < g_sync_for_mt; });
69     for (uint32_t j = 0; j < 10; ++j) {
70       counter->increment(1);
71     }
72   }
73 }
74 } // namespace
75
76 // Slow test with fewer threads where there are more busy waits and
77 // many calls to readFull().  This attempts to test as many of the
78 // code paths in Counter as possible to ensure that counter values are
79 // properly passed from thread local state, both at calls to
80 // readFull() and at thread death.
81 TEST_F(ThreadCachedIntTest, MultithreadedSlow) {
82   static constexpr uint32_t kNumThreads = 20;
83   g_sync_for_mt = 0;
84   vector<unique_ptr<std::thread>> threads(kNumThreads);
85   // Creates kNumThreads threads.  Each thread performs a different
86   // number of iterations in Runner() - threads[0] performs 1
87   // iteration, threads[1] performs 2 iterations, threads[2] performs
88   // 3 iterations, and so on.
89   for (uint32_t i = 0; i < kNumThreads; ++i) {
90     threads[i] =
91         std::make_unique<std::thread>(Runner, &g_counter_for_mt_slow, i + 1);
92   }
93   // Variable to grab current counter value.
94   int32_t counter_value;
95   // The expected value of the counter.
96   int32_t total = 0;
97   // The expected value of GetDeadThreadsTotal().
98   int32_t dead_total = 0;
99   // Each iteration of the following thread allows one additional
100   // iteration of the threads.  Given that the threads perform
101   // different number of iterations from 1 through kNumThreads, one
102   // thread will complete in each of the iterations of the loop below.
103   for (uint32_t i = 0; i < kNumThreads; ++i) {
104     // Allow upto iteration i on all threads.
105     {
106       std::lock_guard<std::mutex> lk(cv_m);
107       g_sync_for_mt = i + 1;
108     }
109     cv.notify_all();
110     total += (kNumThreads - i) * 10;
111     // Loop until the counter reaches its expected value.
112     do {
113       counter_value = g_counter_for_mt_slow.readFull();
114     } while (counter_value < total);
115     // All threads have done what they can until iteration i, now make
116     // sure they don't go further by checking 10 more times in the
117     // following loop.
118     for (uint32_t j = 0; j < 10; ++j) {
119       counter_value = g_counter_for_mt_slow.readFull();
120       EXPECT_EQ(total, counter_value);
121     }
122     dead_total += (i + 1) * 10;
123     EXPECT_GE(dead_total, GetDeadThreadsTotal(g_counter_for_mt_slow));
124   }
125   // All threads are done.
126   for (uint32_t i = 0; i < kNumThreads; ++i) {
127     threads[i]->join();
128   }
129   counter_value = g_counter_for_mt_slow.readFull();
130   EXPECT_EQ(total, counter_value);
131   EXPECT_EQ(total, dead_total);
132   EXPECT_EQ(dead_total, GetDeadThreadsTotal(g_counter_for_mt_slow));
133 }
134
135 // Fast test with lots of threads and only one call to readFull()
136 // at the end.
137 TEST_F(ThreadCachedIntTest, MultithreadedFast) {
138   static constexpr uint32_t kNumThreads = 1000;
139   g_sync_for_mt = 0;
140   vector<unique_ptr<std::thread>> threads(kNumThreads);
141   // Creates kNumThreads threads.  Each thread performs a different
142   // number of iterations in Runner() - threads[0] performs 1
143   // iteration, threads[1] performs 2 iterations, threads[2] performs
144   // 3 iterations, and so on.
145   for (uint32_t i = 0; i < kNumThreads; ++i) {
146     threads[i] =
147         std::make_unique<std::thread>(Runner, &g_counter_for_mt_fast, i + 1);
148   }
149   // Let the threads run to completion.
150   {
151     std::lock_guard<std::mutex> lk(cv_m);
152     g_sync_for_mt = kNumThreads;
153   }
154   cv.notify_all();
155   // The expected value of the counter.
156   uint32_t total = 0;
157   for (uint32_t i = 0; i < kNumThreads; ++i) {
158     total += (kNumThreads - i) * 10;
159   }
160   // Wait for all threads to complete.
161   for (uint32_t i = 0; i < kNumThreads; ++i) {
162     threads[i]->join();
163   }
164   int32_t counter_value = g_counter_for_mt_fast.readFull();
165   EXPECT_EQ(total, counter_value);
166   EXPECT_EQ(total, GetDeadThreadsTotal(g_counter_for_mt_fast));
167 }
168
169 TEST(ThreadCachedInt, SingleThreadedNotCached) {
170   ThreadCachedInt<int64_t> val(0, 0);
171   EXPECT_EQ(0, val.readFast());
172   ++val;
173   EXPECT_EQ(1, val.readFast());
174   for (int i = 0; i < 41; ++i) {
175     val.increment(1);
176   }
177   EXPECT_EQ(42, val.readFast());
178   --val;
179   EXPECT_EQ(41, val.readFast());
180 }
181
182 // Note: This is somewhat fragile to the implementation.  If this causes
183 // problems, feel free to remove it.
184 TEST(ThreadCachedInt, SingleThreadedCached) {
185   ThreadCachedInt<int64_t> val(0, 10);
186   EXPECT_EQ(0, val.readFast());
187   ++val;
188   EXPECT_EQ(0, val.readFast());
189   for (int i = 0; i < 7; ++i) {
190     val.increment(1);
191   }
192   EXPECT_EQ(0, val.readFast());
193   EXPECT_EQ(0, val.readFastAndReset());
194   EXPECT_EQ(8, val.readFull());
195   EXPECT_EQ(8, val.readFullAndReset());
196   EXPECT_EQ(0, val.readFull());
197   EXPECT_EQ(0, val.readFast());
198 }
199
200 ThreadCachedInt<int32_t> globalInt32(0, 11);
201 ThreadCachedInt<int64_t> globalInt64(0, 11);
202 int kNumInserts = 100000;
203 DEFINE_int32(numThreads, 8, "Number simultaneous threads for benchmarks.");
204 #define CREATE_INC_FUNC(size)                                       \
205   void incFunc ## size () {                                         \
206     const int num = kNumInserts / FLAGS_numThreads;                 \
207     for (int i = 0; i < num; ++i) {                                 \
208       ++globalInt ## size ;                                         \
209     }                                                               \
210   }
211 CREATE_INC_FUNC(64);
212 CREATE_INC_FUNC(32);
213
214 // Confirms counts are accurate with competing threads
215 TEST(ThreadCachedInt, MultiThreadedCached) {
216   kNumInserts = 100000;
217   CHECK_EQ(0, kNumInserts % FLAGS_numThreads) <<
218     "FLAGS_numThreads must evenly divide kNumInserts (" << kNumInserts << ").";
219   const int numPerThread = kNumInserts / FLAGS_numThreads;
220   ThreadCachedInt<int64_t> TCInt64(0, numPerThread - 2);
221   {
222     std::atomic<bool> run(true);
223     std::atomic<int> threadsDone(0);
224     std::vector<std::thread> threads;
225     for (int i = 0; i < FLAGS_numThreads; ++i) {
226       threads.push_back(std::thread([&] {
227         FOR_EACH_RANGE(k, 0, numPerThread) {
228           ++TCInt64;
229         }
230         std::atomic_fetch_add(&threadsDone, 1);
231         while (run.load()) { usleep(100); }
232       }));
233     }
234
235     // We create and increment another ThreadCachedInt here to make sure it
236     // doesn't interact with the other instances
237     ThreadCachedInt<int64_t> otherTCInt64(0, 10);
238     otherTCInt64.set(33);
239     ++otherTCInt64;
240
241     while (threadsDone.load() < FLAGS_numThreads) { usleep(100); }
242
243     ++otherTCInt64;
244
245     // Threads are done incrementing, but caches have not been flushed yet, so
246     // we have to readFull.
247     EXPECT_NE(kNumInserts, TCInt64.readFast());
248     EXPECT_EQ(kNumInserts, TCInt64.readFull());
249
250     run.store(false);
251     for (auto& t : threads) {
252       t.join();
253     }
254
255   }  // Caches are flushed when threads finish
256   EXPECT_EQ(kNumInserts, TCInt64.readFast());
257 }
258
259 #define MAKE_MT_CACHE_SIZE_BM(size)                             \
260   void BM_mt_cache_size ## size (int iters, int cacheSize) {    \
261     kNumInserts = iters;                                        \
262     globalInt ## size.set(0);                                   \
263     globalInt ## size.setCacheSize(cacheSize);                  \
264     std::vector<std::thread> threads;                           \
265     for (int i = 0; i < FLAGS_numThreads; ++i) {                \
266       threads.push_back(std::thread(incFunc ## size));          \
267     }                                                           \
268     for (auto& t : threads) {                                   \
269       t.join();                                                 \
270     }                                                           \
271   }
272 MAKE_MT_CACHE_SIZE_BM(64);
273 MAKE_MT_CACHE_SIZE_BM(32);
274
275 #define REG_BASELINE(name, inc_stmt)                            \
276   BENCHMARK(FB_CONCATENATE(BM_mt_baseline_, name), iters) {     \
277     const int iterPerThread = iters / FLAGS_numThreads;         \
278     std::vector<std::thread> threads;                           \
279     for (int i = 0; i < FLAGS_numThreads; ++i) {                \
280       threads.push_back(std::thread([&]() {                     \
281             for (int j = 0; j < iterPerThread; ++j) {           \
282               inc_stmt;                                         \
283             }                                                   \
284           }));                                                  \
285     }                                                           \
286     for (auto& t : threads) {                                   \
287       t.join();                                                 \
288     }                                                           \
289   }
290
291 ThreadLocal<int64_t> globalTL64Baseline;
292 ThreadLocal<int32_t> globalTL32Baseline;
293 std::atomic<int64_t> globalInt64Baseline(0);
294 std::atomic<int32_t> globalInt32Baseline(0);
295 FOLLY_TLS int64_t global__thread64;
296 FOLLY_TLS int32_t global__thread32;
297
298 // Alternate lock-free implementation.  Achieves about the same performance,
299 // but uses about 20x more memory than ThreadCachedInt with 24 threads.
300 struct ShardedAtomicInt {
301   static const int64_t kBuckets_ = 2048;
302   std::atomic<int64_t> ints_[kBuckets_];
303
304   inline void inc(int64_t val = 1) {
305     int buck = hash::twang_mix64(folly::getCurrentThreadID()) & (kBuckets_ - 1);
306     std::atomic_fetch_add(&ints_[buck], val);
307   }
308
309   // read the first few and extrapolate
310   int64_t readFast() {
311     int64_t ret = 0;
312     static const int numToRead = 8;
313     FOR_EACH_RANGE(i, 0, numToRead) {
314       ret += ints_[i].load(std::memory_order_relaxed);
315     }
316     return ret * (kBuckets_ / numToRead);
317   }
318
319   // readFull is lock-free, but has to do thousands of loads...
320   int64_t readFull() {
321     int64_t ret = 0;
322     for (auto& i : ints_) {
323       // Fun fact - using memory_order_consume below reduces perf 30-40% in high
324       // contention benchmarks.
325       ret += i.load(std::memory_order_relaxed);
326     }
327     return ret;
328   }
329 };
330 ShardedAtomicInt shd_int64;
331
332 REG_BASELINE(_thread64, global__thread64 += 1);
333 REG_BASELINE(_thread32, global__thread32 += 1);
334 REG_BASELINE(ThreadLocal64, *globalTL64Baseline += 1);
335 REG_BASELINE(ThreadLocal32, *globalTL32Baseline += 1);
336 REG_BASELINE(atomic_inc64,
337              std::atomic_fetch_add(&globalInt64Baseline, int64_t(1)));
338 REG_BASELINE(atomic_inc32,
339              std::atomic_fetch_add(&globalInt32Baseline, int32_t(1)));
340 REG_BASELINE(ShardedAtm64, shd_int64.inc());
341
342 BENCHMARK_PARAM(BM_mt_cache_size64, 0);
343 BENCHMARK_PARAM(BM_mt_cache_size64, 10);
344 BENCHMARK_PARAM(BM_mt_cache_size64, 100);
345 BENCHMARK_PARAM(BM_mt_cache_size64, 1000);
346 BENCHMARK_PARAM(BM_mt_cache_size32, 0);
347 BENCHMARK_PARAM(BM_mt_cache_size32, 10);
348 BENCHMARK_PARAM(BM_mt_cache_size32, 100);
349 BENCHMARK_PARAM(BM_mt_cache_size32, 1000);
350 BENCHMARK_DRAW_LINE();
351
352 // single threaded
353 BENCHMARK(Atomic_readFull) {
354   doNotOptimizeAway(globalInt64Baseline.load(std::memory_order_relaxed));
355 }
356 BENCHMARK(ThrCache_readFull) {
357   doNotOptimizeAway(globalInt64.readFull());
358 }
359 BENCHMARK(Sharded_readFull) {
360   doNotOptimizeAway(shd_int64.readFull());
361 }
362 BENCHMARK(ThrCache_readFast) {
363   doNotOptimizeAway(globalInt64.readFast());
364 }
365 BENCHMARK(Sharded_readFast) {
366   doNotOptimizeAway(shd_int64.readFast());
367 }
368 BENCHMARK_DRAW_LINE();
369
370 // multi threaded
371 REG_BASELINE(Atomic_readFull,
372       doNotOptimizeAway(globalInt64Baseline.load(std::memory_order_relaxed)));
373 REG_BASELINE(ThrCache_readFull, doNotOptimizeAway(globalInt64.readFull()));
374 REG_BASELINE(Sharded_readFull, doNotOptimizeAway(shd_int64.readFull()));
375 REG_BASELINE(ThrCache_readFast, doNotOptimizeAway(globalInt64.readFast()));
376 REG_BASELINE(Sharded_readFast, doNotOptimizeAway(shd_int64.readFast()));
377 BENCHMARK_DRAW_LINE();
378
379 int main(int argc, char** argv) {
380   testing::InitGoogleTest(&argc, argv);
381   gflags::ParseCommandLineFlags(&argc, &argv, true);
382   gflags::SetCommandLineOptionWithMode(
383     "bm_min_usec", "10000", gflags::SET_FLAG_IF_DEFAULT
384   );
385   if (FLAGS_benchmark) {
386     folly::runBenchmarks();
387   }
388   return RUN_ALL_TESTS();
389 }
390
391 /*
392  Ran with 20 threads on dual 12-core Xeon(R) X5650 @ 2.67GHz with 12-MB caches
393
394  Benchmark                               Iters   Total t    t/iter iter/sec
395  ------------------------------------------------------------------------------
396  + 103% BM_mt_baseline__thread64     10000000  13.54 ms  1.354 ns  704.4 M
397 *       BM_mt_baseline__thread32     10000000  6.651 ms  665.1 ps    1.4 G
398  +50.3% BM_mt_baseline_ThreadLocal64  10000000  9.994 ms  999.4 ps  954.2 M
399  +49.9% BM_mt_baseline_ThreadLocal32  10000000  9.972 ms  997.2 ps  956.4 M
400  +2650% BM_mt_baseline_atomic_inc64  10000000  182.9 ms  18.29 ns  52.13 M
401  +2665% BM_mt_baseline_atomic_inc32  10000000  183.9 ms  18.39 ns  51.85 M
402  +75.3% BM_mt_baseline_ShardedAtm64  10000000  11.66 ms  1.166 ns  817.8 M
403  +6670% BM_mt_cache_size64/0         10000000  450.3 ms  45.03 ns  21.18 M
404  +1644% BM_mt_cache_size64/10        10000000    116 ms   11.6 ns   82.2 M
405  + 381% BM_mt_cache_size64/100       10000000  32.04 ms  3.204 ns  297.7 M
406  + 129% BM_mt_cache_size64/1000      10000000  15.24 ms  1.524 ns  625.8 M
407  +6052% BM_mt_cache_size32/0         10000000  409.2 ms  40.92 ns  23.31 M
408  +1304% BM_mt_cache_size32/10        10000000  93.39 ms  9.339 ns  102.1 M
409  + 298% BM_mt_cache_size32/100       10000000  26.52 ms  2.651 ns  359.7 M
410  +68.1% BM_mt_cache_size32/1000      10000000  11.18 ms  1.118 ns  852.9 M
411 ------------------------------------------------------------------------------
412  +10.4% Atomic_readFull              10000000  36.05 ms  3.605 ns  264.5 M
413  + 619% ThrCache_readFull            10000000  235.1 ms  23.51 ns  40.57 M
414  SLOW   Sharded_readFull              1981093      2 s    1.01 us  967.3 k
415 *       ThrCache_readFast            10000000  32.65 ms  3.265 ns  292.1 M
416  +10.0% Sharded_readFast             10000000  35.92 ms  3.592 ns  265.5 M
417 ------------------------------------------------------------------------------
418  +4.54% BM_mt_baseline_Atomic_readFull  10000000  8.672 ms  867.2 ps  1.074 G
419  SLOW   BM_mt_baseline_ThrCache_readFull  10000000  996.9 ms  99.69 ns  9.567 M
420  SLOW   BM_mt_baseline_Sharded_readFull  10000000  891.5 ms  89.15 ns   10.7 M
421 *       BM_mt_baseline_ThrCache_readFast  10000000  8.295 ms  829.5 ps  1.123 G
422  +12.7% BM_mt_baseline_Sharded_readFast  10000000  9.348 ms  934.8 ps   1020 M
423 ------------------------------------------------------------------------------
424 */