subscriptions
[folly.git] / folly / experimental / wangle / concurrent / test / ThreadPoolExecutorTest.cpp
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/experimental/wangle/concurrent/FutureExecutor.h>
18 #include <folly/experimental/wangle/concurrent/ThreadPoolExecutor.h>
19 #include <folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h>
20 #include <folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h>
21 #include <glog/logging.h>
22 #include <gtest/gtest.h>
23
24 using namespace folly::wangle;
25 using namespace std::chrono;
26
27 static Func burnMs(uint64_t ms) {
28   return [ms]() { std::this_thread::sleep_for(milliseconds(ms)); };
29 }
30
31 template <class TPE>
32 static void basic() {
33   // Create and destroy
34   TPE tpe(10);
35 }
36
37 TEST(ThreadPoolExecutorTest, CPUBasic) {
38   basic<CPUThreadPoolExecutor>();
39 }
40
41 TEST(IOThreadPoolExecutorTest, IOBasic) {
42   basic<IOThreadPoolExecutor>();
43 }
44
45 template <class TPE>
46 static void resize() {
47   TPE tpe(100);
48   EXPECT_EQ(100, tpe.numThreads());
49   tpe.setNumThreads(50);
50   EXPECT_EQ(50, tpe.numThreads());
51   tpe.setNumThreads(150);
52   EXPECT_EQ(150, tpe.numThreads());
53 }
54
55 TEST(ThreadPoolExecutorTest, CPUResize) {
56   resize<CPUThreadPoolExecutor>();
57 }
58
59 TEST(ThreadPoolExecutorTest, IOResize) {
60   resize<IOThreadPoolExecutor>();
61 }
62
63 template <class TPE>
64 static void stop() {
65   TPE tpe(1);
66   std::atomic<int> completed(0);
67   auto f = [&](){
68     burnMs(10)();
69     completed++;
70   };
71   for (int i = 0; i < 1000; i++) {
72     tpe.add(f);
73   }
74   tpe.stop();
75   EXPECT_GT(1000, completed);
76 }
77
78 TEST(ThreadPoolExecutorTest, CPUStop) {
79   stop<CPUThreadPoolExecutor>();
80 }
81
82 TEST(ThreadPoolExecutorTest, IOStop) {
83   stop<IOThreadPoolExecutor>();
84 }
85
86 template <class TPE>
87 static void join() {
88   TPE tpe(10);
89   std::atomic<int> completed(0);
90   auto f = [&](){
91     burnMs(1)();
92     completed++;
93   };
94   for (int i = 0; i < 1000; i++) {
95     tpe.add(f);
96   }
97   tpe.join();
98   EXPECT_EQ(1000, completed);
99 }
100
101 TEST(ThreadPoolExecutorTest, CPUJoin) {
102   join<CPUThreadPoolExecutor>();
103 }
104
105 TEST(ThreadPoolExecutorTest, IOJoin) {
106   join<IOThreadPoolExecutor>();
107 }
108
109 template <class TPE>
110 static void resizeUnderLoad() {
111   TPE tpe(10);
112   std::atomic<int> completed(0);
113   auto f = [&](){
114     burnMs(1)();
115     completed++;
116   };
117   for (int i = 0; i < 1000; i++) {
118     tpe.add(f);
119   }
120   tpe.setNumThreads(5);
121   tpe.setNumThreads(15);
122   tpe.join();
123   EXPECT_EQ(1000, completed);
124 }
125
126 TEST(ThreadPoolExecutorTest, CPUResizeUnderLoad) {
127   resizeUnderLoad<CPUThreadPoolExecutor>();
128 }
129
130 TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) {
131   resizeUnderLoad<IOThreadPoolExecutor>();
132 }
133
134 template <class TPE>
135 static void poolStats() {
136   folly::Baton<> startBaton, endBaton;
137   TPE tpe(1);
138   auto stats = tpe.getPoolStats();
139   EXPECT_EQ(1, stats.threadCount);
140   EXPECT_EQ(1, stats.idleThreadCount);
141   EXPECT_EQ(0, stats.activeThreadCount);
142   EXPECT_EQ(0, stats.pendingTaskCount);
143   EXPECT_EQ(0, stats.totalTaskCount);
144   tpe.add([&](){ startBaton.post(); endBaton.wait(); });
145   tpe.add([&](){});
146   startBaton.wait();
147   stats = tpe.getPoolStats();
148   EXPECT_EQ(1, stats.threadCount);
149   EXPECT_EQ(0, stats.idleThreadCount);
150   EXPECT_EQ(1, stats.activeThreadCount);
151   EXPECT_EQ(1, stats.pendingTaskCount);
152   EXPECT_EQ(2, stats.totalTaskCount);
153   endBaton.post();
154 }
155
156 TEST(ThreadPoolExecutorTest, CPUPoolStats) {
157   poolStats<CPUThreadPoolExecutor>();
158 }
159
160 TEST(ThreadPoolExecutorTest, IOPoolStats) {
161   poolStats<IOThreadPoolExecutor>();
162 }
163
164 template <class TPE>
165 static void taskStats() {
166   TPE tpe(1);
167   std::atomic<int> c(0);
168   auto s = tpe.subscribeToTaskStats(
169       Observer<ThreadPoolExecutor::TaskStats>::create(
170           [&](ThreadPoolExecutor::TaskStats stats) {
171         int i = c++;
172         EXPECT_LT(milliseconds(0), stats.runTime);
173         if (i == 1) {
174           EXPECT_LT(milliseconds(0), stats.waitTime);
175         }
176       }));
177   tpe.add(burnMs(10));
178   tpe.add(burnMs(10));
179   tpe.join();
180   EXPECT_EQ(2, c);
181 }
182
183 TEST(ThreadPoolExecutorTest, CPUTaskStats) {
184   taskStats<CPUThreadPoolExecutor>();
185 }
186
187 TEST(ThreadPoolExecutorTest, IOTaskStats) {
188   taskStats<IOThreadPoolExecutor>();
189 }
190
191 template <class TPE>
192 static void expiration() {
193   TPE tpe(1);
194   std::atomic<int> statCbCount(0);
195   auto s = tpe.subscribeToTaskStats(
196       Observer<ThreadPoolExecutor::TaskStats>::create(
197           [&](ThreadPoolExecutor::TaskStats stats) {
198         int i = statCbCount++;
199         if (i == 0) {
200           EXPECT_FALSE(stats.expired);
201         } else if (i == 1) {
202           EXPECT_TRUE(stats.expired);
203         } else {
204           FAIL();
205         }
206       }));
207   std::atomic<int> expireCbCount(0);
208   auto expireCb = [&] () { expireCbCount++; };
209   tpe.add(burnMs(10), seconds(60), expireCb);
210   tpe.add(burnMs(10), milliseconds(10), expireCb);
211   tpe.join();
212   EXPECT_EQ(2, statCbCount);
213   EXPECT_EQ(1, expireCbCount);
214 }
215
216 TEST(ThreadPoolExecutorTest, CPUExpiration) {
217   expiration<CPUThreadPoolExecutor>();
218 }
219
220 TEST(ThreadPoolExecutorTest, IOExpiration) {
221   expiration<IOThreadPoolExecutor>();
222 }
223
224 template <typename TPE>
225 static void futureExecutor() {
226   FutureExecutor<TPE> fe(2);
227   int c = 0;
228   fe.addFuture([] () { return makeFuture<int>(42); }).then(
229     [&] (Try<int>&& t) {
230       c++;
231       EXPECT_EQ(42, t.value());
232     });
233   fe.addFuture([] () { return 100; }).then(
234     [&] (Try<int>&& t) {
235       c++;
236       EXPECT_EQ(100, t.value());
237     });
238   fe.addFuture([] () { return makeFuture(); }).then(
239     [&] (Try<void>&& t) {
240       c++;
241       EXPECT_NO_THROW(t.value());
242     });
243   fe.addFuture([] () { return; }).then(
244     [&] (Try<void>&& t) {
245       c++;
246       EXPECT_NO_THROW(t.value());
247     });
248   fe.addFuture([] () { throw std::runtime_error("oops"); }).then(
249     [&] (Try<void>&& t) {
250       c++;
251       EXPECT_THROW(t.value(), std::runtime_error);
252     });
253   // Test doing actual async work
254   folly::Baton<> baton;
255   fe.addFuture([&] () {
256     auto p = std::make_shared<Promise<int>>();
257     std::thread t([p](){
258       burnMs(10)();
259       p->setValue(42);
260     });
261     t.detach();
262     return p->getFuture();
263   }).then([&] (Try<int>&& t) {
264     EXPECT_EQ(42, t.value());
265     c++;
266     baton.post();
267   });
268   baton.wait();
269   fe.join();
270   EXPECT_EQ(6, c);
271 }
272
273 TEST(ThreadPoolExecutorTest, CPUFuturePool) {
274   futureExecutor<CPUThreadPoolExecutor>();
275 }
276
277 TEST(ThreadPoolExecutorTest, IOFuturePool) {
278   futureExecutor<IOThreadPoolExecutor>();
279 }