c9a1e0ba20beb9a4a412ec0e1a07d2f35969183b
[folly.git] / folly / futures / test / BarrierTest.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/futures/Barrier.h>
18
19 #include <atomic>
20 #include <condition_variable>
21 #include <mutex>
22
23 #include <folly/Random.h>
24 #include <glog/logging.h>
25 #include <gtest/gtest.h>
26
27 namespace folly { namespace futures { namespace test {
28
29 TEST(BarrierTest, Simple) {
30   constexpr uint32_t numThreads = 10;
31
32   std::mutex mutex;
33   std::condition_variable b1DoneCond;
34   std::condition_variable b2DoneCond;
35   std::atomic<uint32_t> b1TrueSeen(0);
36   std::atomic<uint32_t> b1Passed(0);
37   std::atomic<uint32_t> b2TrueSeen(0);
38   std::atomic<uint32_t> b2Passed(0);
39
40   Barrier barrier(numThreads + 1);
41
42   std::vector<std::thread> threads;
43   threads.reserve(numThreads);
44   for (uint32_t i = 0; i < numThreads; ++i) {
45     threads.emplace_back([&] () {
46       barrier.wait()
47         .then(
48             [&] (bool v) {
49               std::unique_lock<std::mutex> lock(mutex);
50               b1TrueSeen += uint32_t(v);
51               if (++b1Passed == numThreads) {
52                 b1DoneCond.notify_one();
53               }
54               return barrier.wait();
55             })
56         .then(
57             [&] (bool v) {
58               std::unique_lock<std::mutex> lock(mutex);
59               b2TrueSeen += uint32_t(v);
60               if (++b2Passed == numThreads) {
61                 b2DoneCond.notify_one();
62               }
63             })
64         .get();
65     });
66   }
67
68   /* sleep override */
69   std::this_thread::sleep_for(std::chrono::milliseconds(50));
70   EXPECT_EQ(0, b1Passed);
71   EXPECT_EQ(0, b1TrueSeen);
72
73   b1TrueSeen += barrier.wait().get();
74
75   {
76     std::unique_lock<std::mutex> lock(mutex);
77     while (b1Passed != numThreads) {
78       b1DoneCond.wait(lock);
79     }
80     EXPECT_EQ(1, b1TrueSeen);
81   }
82
83   /* sleep override */
84   std::this_thread::sleep_for(std::chrono::milliseconds(50));
85   EXPECT_EQ(0, b2Passed);
86   EXPECT_EQ(0, b2TrueSeen);
87
88   b2TrueSeen += barrier.wait().get();
89
90   {
91     std::unique_lock<std::mutex> lock(mutex);
92     while (b2Passed != numThreads) {
93       b2DoneCond.wait(lock);
94     }
95     EXPECT_EQ(1, b2TrueSeen);
96   }
97
98   for (auto& t : threads) {
99     t.join();
100   }
101 }
102
103 TEST(BarrierTest, Random) {
104   // Create numThreads threads.
105   //
106   // Each thread repeats the following numIterations times:
107   //   - grab a randomly chosen number of futures from the barrier, waiting
108   //     for a short random time between each
109   //   - wait for all futures to complete
110   //   - record whether the one future returning true was seen among them
111   //
112   // At the end, we verify that exactly one future returning true was seen
113   // for each iteration.
114   constexpr uint32_t numIterations = 1;
115   auto numThreads = folly::Random::rand32(30, 91);
116
117   struct ThreadInfo {
118     ThreadInfo() { }
119     std::thread thread;
120     uint32_t iteration = 0;
121     uint32_t numFutures;
122     std::vector<uint32_t> trueSeen;
123   };
124
125   std::vector<ThreadInfo> threads;
126   threads.resize(numThreads);
127
128   uint32_t totalFutures = 0;
129   for (auto& tinfo : threads) {
130     tinfo.numFutures = folly::Random::rand32(100);
131     tinfo.trueSeen.resize(numIterations);
132     totalFutures += tinfo.numFutures;
133   }
134
135   Barrier barrier(totalFutures);
136
137   for (auto& tinfo : threads) {
138     auto pinfo = &tinfo;
139     tinfo.thread = std::thread(
140         [numIterations, pinfo, &barrier] () {
141           std::vector<folly::Future<bool>> futures;
142           futures.reserve(pinfo->numFutures);
143           for (uint32_t i = 0; i < numIterations; ++i, ++pinfo->iteration) {
144             futures.clear();
145             for (uint32_t j = 0; j < pinfo->numFutures; ++j) {
146               futures.push_back(barrier.wait());
147               auto nanos = folly::Random::rand32(10 * 1000 * 1000);
148               /* sleep override */
149               std::this_thread::sleep_for(std::chrono::nanoseconds(nanos));
150             }
151             auto results = folly::collect(futures).get();
152             pinfo->trueSeen[i] =
153               std::count(results.begin(), results.end(), true);
154           }
155         });
156   }
157
158   for (auto& tinfo : threads) {
159     tinfo.thread.join();
160     EXPECT_EQ(numIterations, tinfo.iteration);
161   }
162
163   for (uint32_t i = 0; i < numIterations; ++i) {
164     uint32_t trueCount = 0;
165     for (auto& tinfo : threads) {
166       trueCount += tinfo.trueSeen[i];
167     }
168     EXPECT_EQ(1, trueCount);
169   }
170 }
171
172 }}}  // namespaces