fix future executor test
[folly.git] / folly / experimental / wangle / concurrent / test / ThreadPoolExecutorTest.cpp
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/experimental/wangle/concurrent/FutureExecutor.h>
18 #include <folly/experimental/wangle/concurrent/ThreadPoolExecutor.h>
19 #include <folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h>
20 #include <folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h>
21 #include <glog/logging.h>
22 #include <gtest/gtest.h>
23
24 using namespace folly::wangle;
25 using namespace std::chrono;
26
27 static Func burnMs(uint64_t ms) {
28   return [ms]() { std::this_thread::sleep_for(milliseconds(ms)); };
29 }
30
31 template <class TPE>
32 static void basic() {
33   // Create and destroy
34   TPE tpe(10);
35 }
36
37 TEST(ThreadPoolExecutorTest, CPUBasic) {
38   basic<CPUThreadPoolExecutor>();
39 }
40
41 TEST(IOThreadPoolExecutorTest, IOBasic) {
42   basic<IOThreadPoolExecutor>();
43 }
44
45 template <class TPE>
46 static void resize() {
47   TPE tpe(100);
48   EXPECT_EQ(100, tpe.numThreads());
49   tpe.setNumThreads(50);
50   EXPECT_EQ(50, tpe.numThreads());
51   tpe.setNumThreads(150);
52   EXPECT_EQ(150, tpe.numThreads());
53 }
54
55 TEST(ThreadPoolExecutorTest, CPUResize) {
56   resize<CPUThreadPoolExecutor>();
57 }
58
59 TEST(ThreadPoolExecutorTest, IOResize) {
60   resize<IOThreadPoolExecutor>();
61 }
62
63 template <class TPE>
64 static void stop() {
65   TPE tpe(1);
66   std::atomic<int> completed(0);
67   auto f = [&](){
68     burnMs(10)();
69     completed++;
70   };
71   for (int i = 0; i < 1000; i++) {
72     tpe.add(f);
73   }
74   tpe.stop();
75   EXPECT_GT(1000, completed);
76 }
77
78 TEST(ThreadPoolExecutorTest, CPUStop) {
79   stop<CPUThreadPoolExecutor>();
80 }
81
82 TEST(ThreadPoolExecutorTest, IOStop) {
83   stop<IOThreadPoolExecutor>();
84 }
85
86 template <class TPE>
87 static void join() {
88   TPE tpe(10);
89   std::atomic<int> completed(0);
90   auto f = [&](){
91     burnMs(1)();
92     completed++;
93   };
94   for (int i = 0; i < 1000; i++) {
95     tpe.add(f);
96   }
97   tpe.join();
98   EXPECT_EQ(1000, completed);
99 }
100
101 TEST(ThreadPoolExecutorTest, CPUJoin) {
102   join<CPUThreadPoolExecutor>();
103 }
104
105 TEST(ThreadPoolExecutorTest, IOJoin) {
106   join<IOThreadPoolExecutor>();
107 }
108
109 template <class TPE>
110 static void resizeUnderLoad() {
111   TPE tpe(10);
112   std::atomic<int> completed(0);
113   auto f = [&](){
114     burnMs(1)();
115     completed++;
116   };
117   for (int i = 0; i < 1000; i++) {
118     tpe.add(f);
119   }
120   tpe.setNumThreads(5);
121   tpe.setNumThreads(15);
122   tpe.join();
123   EXPECT_EQ(1000, completed);
124 }
125
126 TEST(ThreadPoolExecutorTest, CPUResizeUnderLoad) {
127   resizeUnderLoad<CPUThreadPoolExecutor>();
128 }
129
130 TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) {
131   resizeUnderLoad<IOThreadPoolExecutor>();
132 }
133
134 template <class TPE>
135 static void poolStats() {
136   folly::Baton<> startBaton, endBaton;
137   TPE tpe(1);
138   auto stats = tpe.getPoolStats();
139   EXPECT_EQ(1, stats.threadCount);
140   EXPECT_EQ(1, stats.idleThreadCount);
141   EXPECT_EQ(0, stats.activeThreadCount);
142   EXPECT_EQ(0, stats.pendingTaskCount);
143   EXPECT_EQ(0, stats.totalTaskCount);
144   tpe.add([&](){ startBaton.post(); endBaton.wait(); });
145   tpe.add([&](){});
146   startBaton.wait();
147   stats = tpe.getPoolStats();
148   EXPECT_EQ(1, stats.threadCount);
149   EXPECT_EQ(0, stats.idleThreadCount);
150   EXPECT_EQ(1, stats.activeThreadCount);
151   EXPECT_EQ(1, stats.pendingTaskCount);
152   EXPECT_EQ(2, stats.totalTaskCount);
153   endBaton.post();
154 }
155
156 TEST(ThreadPoolExecutorTest, CPUPoolStats) {
157   poolStats<CPUThreadPoolExecutor>();
158 }
159
160 TEST(ThreadPoolExecutorTest, IOPoolStats) {
161   poolStats<IOThreadPoolExecutor>();
162 }
163
164 template <class TPE>
165 static void taskStats() {
166   TPE tpe(1);
167   std::atomic<int> c(0);
168   tpe.subscribeToTaskStats(Observer<ThreadPoolExecutor::TaskStats>::create(
169       [&] (ThreadPoolExecutor::TaskStats stats) {
170         int i = c++;
171         EXPECT_LT(milliseconds(0), stats.runTime);
172         if (i == 1) {
173           EXPECT_LT(milliseconds(0), stats.waitTime);
174         }
175       }));
176   tpe.add(burnMs(10));
177   tpe.add(burnMs(10));
178   tpe.join();
179   EXPECT_EQ(2, c);
180 }
181
182 TEST(ThreadPoolExecutorTest, CPUTaskStats) {
183   taskStats<CPUThreadPoolExecutor>();
184 }
185
186 TEST(ThreadPoolExecutorTest, IOTaskStats) {
187   taskStats<IOThreadPoolExecutor>();
188 }
189
190 template <class TPE>
191 static void expiration() {
192   TPE tpe(1);
193   std::atomic<int> statCbCount(0);
194   tpe.subscribeToTaskStats(Observer<ThreadPoolExecutor::TaskStats>::create(
195       [&] (ThreadPoolExecutor::TaskStats stats) {
196         int i = statCbCount++;
197         if (i == 0) {
198           EXPECT_FALSE(stats.expired);
199         } else if (i == 1) {
200           EXPECT_TRUE(stats.expired);
201         } else {
202           FAIL();
203         }
204       }));
205   std::atomic<int> expireCbCount(0);
206   auto expireCb = [&] () { expireCbCount++; };
207   tpe.add(burnMs(10), seconds(60), expireCb);
208   tpe.add(burnMs(10), milliseconds(10), expireCb);
209   tpe.join();
210   EXPECT_EQ(2, statCbCount);
211   EXPECT_EQ(1, expireCbCount);
212 }
213
214 TEST(ThreadPoolExecutorTest, CPUExpiration) {
215   expiration<CPUThreadPoolExecutor>();
216 }
217
218 TEST(ThreadPoolExecutorTest, IOExpiration) {
219   expiration<IOThreadPoolExecutor>();
220 }
221
222 template <typename TPE>
223 static void futureExecutor() {
224   FutureExecutor<TPE> fe(2);
225   int c = 0;
226   fe.addFuture([] () { return makeFuture<int>(42); }).then(
227     [&] (Try<int>&& t) {
228       c++;
229       EXPECT_EQ(42, t.value());
230     });
231   fe.addFuture([] () { return 100; }).then(
232     [&] (Try<int>&& t) {
233       c++;
234       EXPECT_EQ(100, t.value());
235     });
236   fe.addFuture([] () { return makeFuture(); }).then(
237     [&] (Try<void>&& t) {
238       c++;
239       EXPECT_NO_THROW(t.value());
240     });
241   fe.addFuture([] () { return; }).then(
242     [&] (Try<void>&& t) {
243       c++;
244       EXPECT_NO_THROW(t.value());
245     });
246   fe.addFuture([] () { throw std::runtime_error("oops"); }).then(
247     [&] (Try<void>&& t) {
248       c++;
249       EXPECT_THROW(t.value(), std::runtime_error);
250     });
251   // Test doing actual async work
252   folly::Baton<> baton;
253   fe.addFuture([&] () {
254     auto p = std::make_shared<Promise<int>>();
255     std::thread t([p](){
256       burnMs(10)();
257       p->setValue(42);
258     });
259     t.detach();
260     return p->getFuture();
261   }).then([&] (Try<int>&& t) {
262     EXPECT_EQ(42, t.value());
263     c++;
264     baton.post();
265   });
266   baton.wait();
267   fe.join();
268   EXPECT_EQ(6, c);
269 }
270
271 TEST(ThreadPoolExecutorTest, CPUFuturePool) {
272   futureExecutor<CPUThreadPoolExecutor>();
273 }
274
275 TEST(ThreadPoolExecutorTest, IOFuturePool) {
276   futureExecutor<IOThreadPoolExecutor>();
277 }