fix more flakey tests
[folly.git] / folly / experimental / wangle / concurrent / test / ThreadPoolExecutorTest.cpp
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/experimental/wangle/concurrent/ThreadPoolExecutor.h>
18 #include <folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h>
19 #include <folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h>
20 #include <glog/logging.h>
21 #include <gtest/gtest.h>
22
23 using namespace folly::wangle;
24 using namespace std::chrono;
25
26 static Func burnMs(uint64_t ms) {
27   return [ms]() { std::this_thread::sleep_for(milliseconds(ms)); };
28 }
29
30 template <class TPE>
31 static void basic() {
32   // Create and destroy
33   TPE tpe(10);
34 }
35
36 TEST(ThreadPoolExecutorTest, CPUBasic) {
37   basic<CPUThreadPoolExecutor>();
38 }
39
40 TEST(IOThreadPoolExecutorTest, IOBasic) {
41   basic<IOThreadPoolExecutor>();
42 }
43
44 template <class TPE>
45 static void resize() {
46   TPE tpe(100);
47   EXPECT_EQ(100, tpe.numThreads());
48   tpe.setNumThreads(50);
49   EXPECT_EQ(50, tpe.numThreads());
50   tpe.setNumThreads(150);
51   EXPECT_EQ(150, tpe.numThreads());
52 }
53
54 TEST(ThreadPoolExecutorTest, CPUResize) {
55   resize<CPUThreadPoolExecutor>();
56 }
57
58 TEST(ThreadPoolExecutorTest, IOResize) {
59   resize<IOThreadPoolExecutor>();
60 }
61
62 template <class TPE>
63 static void stop() {
64   TPE tpe(1);
65   std::atomic<int> completed(0);
66   auto f = [&](){
67     burnMs(10)();
68     completed++;
69   };
70   for (int i = 0; i < 1000; i++) {
71     tpe.add(f);
72   }
73   tpe.stop();
74   EXPECT_GT(1000, completed);
75 }
76
77 TEST(ThreadPoolExecutorTest, CPUStop) {
78   stop<CPUThreadPoolExecutor>();
79 }
80
81 TEST(ThreadPoolExecutorTest, IOStop) {
82   stop<IOThreadPoolExecutor>();
83 }
84
85 template <class TPE>
86 static void join() {
87   TPE tpe(10);
88   std::atomic<int> completed(0);
89   auto f = [&](){
90     burnMs(1)();
91     completed++;
92   };
93   for (int i = 0; i < 1000; i++) {
94     tpe.add(f);
95   }
96   tpe.join();
97   EXPECT_EQ(1000, completed);
98 }
99
100 TEST(ThreadPoolExecutorTest, CPUJoin) {
101   join<CPUThreadPoolExecutor>();
102 }
103
104 TEST(ThreadPoolExecutorTest, IOJoin) {
105   join<IOThreadPoolExecutor>();
106 }
107
108 template <class TPE>
109 static void resizeUnderLoad() {
110   TPE tpe(10);
111   std::atomic<int> completed(0);
112   auto f = [&](){
113     burnMs(1)();
114     completed++;
115   };
116   for (int i = 0; i < 1000; i++) {
117     tpe.add(f);
118   }
119   tpe.setNumThreads(5);
120   tpe.setNumThreads(15);
121   tpe.join();
122   EXPECT_EQ(1000, completed);
123 }
124
125 TEST(ThreadPoolExecutorTest, CPUResizeUnderLoad) {
126   resizeUnderLoad<CPUThreadPoolExecutor>();
127 }
128
129 TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) {
130   resizeUnderLoad<IOThreadPoolExecutor>();
131 }
132
133 template <class TPE>
134 static void poolStats() {
135   folly::Baton<> startBaton, endBaton;
136   TPE tpe(1);
137   auto stats = tpe.getPoolStats();
138   EXPECT_EQ(1, stats.threadCount);
139   EXPECT_EQ(1, stats.idleThreadCount);
140   EXPECT_EQ(0, stats.activeThreadCount);
141   EXPECT_EQ(0, stats.pendingTaskCount);
142   EXPECT_EQ(0, stats.totalTaskCount);
143   tpe.add([&](){ startBaton.post(); endBaton.wait(); });
144   tpe.add([&](){});
145   startBaton.wait();
146   stats = tpe.getPoolStats();
147   EXPECT_EQ(1, stats.threadCount);
148   EXPECT_EQ(0, stats.idleThreadCount);
149   EXPECT_EQ(1, stats.activeThreadCount);
150   EXPECT_EQ(1, stats.pendingTaskCount);
151   EXPECT_EQ(2, stats.totalTaskCount);
152   endBaton.post();
153 }
154
155 TEST(ThreadPoolExecutorTest, CPUPoolStats) {
156   poolStats<CPUThreadPoolExecutor>();
157 }
158
159 TEST(ThreadPoolExecutorTest, IOPoolStats) {
160   poolStats<IOThreadPoolExecutor>();
161 }
162
163 template <class TPE>
164 static void taskStats() {
165   TPE tpe(1);
166   std::atomic<int> c(0);
167   tpe.subscribeToTaskStats(Observer<ThreadPoolExecutor::TaskStats>::create(
168       [&] (ThreadPoolExecutor::TaskStats stats) {
169         int i = c++;
170         EXPECT_LT(milliseconds(0), stats.runTime);
171         if (i == 1) {
172           EXPECT_LT(milliseconds(0), stats.waitTime);
173         }
174       }));
175   tpe.add(burnMs(10));
176   tpe.add(burnMs(10));
177   tpe.join();
178   EXPECT_EQ(2, c);
179 }
180
181 TEST(ThreadPoolExecutorTest, CPUTaskStats) {
182   taskStats<CPUThreadPoolExecutor>();
183 }
184
185 TEST(ThreadPoolExecutorTest, IOTaskStats) {
186   taskStats<IOThreadPoolExecutor>();
187 }
188
189 template <class TPE>
190 static void expiration() {
191   TPE tpe(1);
192   std::atomic<int> statCbCount(0);
193   tpe.subscribeToTaskStats(Observer<ThreadPoolExecutor::TaskStats>::create(
194       [&] (ThreadPoolExecutor::TaskStats stats) {
195         int i = statCbCount++;
196         if (i == 0) {
197           EXPECT_FALSE(stats.expired);
198         } else if (i == 1) {
199           EXPECT_TRUE(stats.expired);
200         } else {
201           FAIL();
202         }
203       }));
204   std::atomic<int> expireCbCount(0);
205   auto expireCb = [&] () { expireCbCount++; };
206   tpe.add(burnMs(10), seconds(60), expireCb);
207   tpe.add(burnMs(10), milliseconds(10), expireCb);
208   tpe.join();
209   EXPECT_EQ(2, statCbCount);
210   EXPECT_EQ(1, expireCbCount);
211 }
212
213 TEST(ThreadPoolExecutorTest, CPUExpiration) {
214   expiration<CPUThreadPoolExecutor>();
215 }
216
217 TEST(ThreadPoolExecutorTest, IOExpiration) {
218   expiration<IOThreadPoolExecutor>();
219 }