Future<Unit> wangle fixup
[folly.git] / folly / wangle / concurrent / test / ThreadPoolExecutorTest.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/wangle/concurrent/FutureExecutor.h>
18 #include <folly/wangle/concurrent/ThreadPoolExecutor.h>
19 #include <folly/wangle/concurrent/CPUThreadPoolExecutor.h>
20 #include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
21 #include <glog/logging.h>
22 #include <gtest/gtest.h>
23
24 using namespace folly;
25 using namespace folly::wangle;
26 using namespace std::chrono;
27
28 static folly::Func burnMs(uint64_t ms) {
29   return [ms]() { std::this_thread::sleep_for(milliseconds(ms)); };
30 }
31
32 template <class TPE>
33 static void basic() {
34   // Create and destroy
35   TPE tpe(10);
36 }
37
38 TEST(ThreadPoolExecutorTest, CPUBasic) {
39   basic<CPUThreadPoolExecutor>();
40 }
41
42 TEST(IOThreadPoolExecutorTest, IOBasic) {
43   basic<IOThreadPoolExecutor>();
44 }
45
46 template <class TPE>
47 static void resize() {
48   TPE tpe(100);
49   EXPECT_EQ(100, tpe.numThreads());
50   tpe.setNumThreads(50);
51   EXPECT_EQ(50, tpe.numThreads());
52   tpe.setNumThreads(150);
53   EXPECT_EQ(150, tpe.numThreads());
54 }
55
56 TEST(ThreadPoolExecutorTest, CPUResize) {
57   resize<CPUThreadPoolExecutor>();
58 }
59
60 TEST(ThreadPoolExecutorTest, IOResize) {
61   resize<IOThreadPoolExecutor>();
62 }
63
64 template <class TPE>
65 static void stop() {
66   TPE tpe(1);
67   std::atomic<int> completed(0);
68   auto f = [&](){
69     burnMs(10)();
70     completed++;
71   };
72   for (int i = 0; i < 1000; i++) {
73     tpe.add(f);
74   }
75   tpe.stop();
76   EXPECT_GT(1000, completed);
77 }
78
79 // IOThreadPoolExecutor's stop() behaves like join(). Outstanding tasks belong
80 // to the event base, will be executed upon its destruction, and cannot be
81 // taken back.
82 template <>
83 void stop<IOThreadPoolExecutor>() {
84   IOThreadPoolExecutor tpe(1);
85   std::atomic<int> completed(0);
86   auto f = [&](){
87     burnMs(10)();
88     completed++;
89   };
90   for (int i = 0; i < 10; i++) {
91     tpe.add(f);
92   }
93   tpe.stop();
94   EXPECT_EQ(10, completed);
95 }
96
97 TEST(ThreadPoolExecutorTest, CPUStop) {
98   stop<CPUThreadPoolExecutor>();
99 }
100
101 TEST(ThreadPoolExecutorTest, IOStop) {
102   stop<IOThreadPoolExecutor>();
103 }
104
105 template <class TPE>
106 static void join() {
107   TPE tpe(10);
108   std::atomic<int> completed(0);
109   auto f = [&](){
110     burnMs(1)();
111     completed++;
112   };
113   for (int i = 0; i < 1000; i++) {
114     tpe.add(f);
115   }
116   tpe.join();
117   EXPECT_EQ(1000, completed);
118 }
119
120 TEST(ThreadPoolExecutorTest, CPUJoin) {
121   join<CPUThreadPoolExecutor>();
122 }
123
124 TEST(ThreadPoolExecutorTest, IOJoin) {
125   join<IOThreadPoolExecutor>();
126 }
127
128 template <class TPE>
129 static void resizeUnderLoad() {
130   TPE tpe(10);
131   std::atomic<int> completed(0);
132   auto f = [&](){
133     burnMs(1)();
134     completed++;
135   };
136   for (int i = 0; i < 1000; i++) {
137     tpe.add(f);
138   }
139   tpe.setNumThreads(5);
140   tpe.setNumThreads(15);
141   tpe.join();
142   EXPECT_EQ(1000, completed);
143 }
144
145 TEST(ThreadPoolExecutorTest, CPUResizeUnderLoad) {
146   resizeUnderLoad<CPUThreadPoolExecutor>();
147 }
148
149 TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) {
150   resizeUnderLoad<IOThreadPoolExecutor>();
151 }
152
153 template <class TPE>
154 static void poolStats() {
155   folly::Baton<> startBaton, endBaton;
156   TPE tpe(1);
157   auto stats = tpe.getPoolStats();
158   EXPECT_EQ(1, stats.threadCount);
159   EXPECT_EQ(1, stats.idleThreadCount);
160   EXPECT_EQ(0, stats.activeThreadCount);
161   EXPECT_EQ(0, stats.pendingTaskCount);
162   EXPECT_EQ(0, stats.totalTaskCount);
163   tpe.add([&](){ startBaton.post(); endBaton.wait(); });
164   tpe.add([&](){});
165   startBaton.wait();
166   stats = tpe.getPoolStats();
167   EXPECT_EQ(1, stats.threadCount);
168   EXPECT_EQ(0, stats.idleThreadCount);
169   EXPECT_EQ(1, stats.activeThreadCount);
170   EXPECT_EQ(1, stats.pendingTaskCount);
171   EXPECT_EQ(2, stats.totalTaskCount);
172   endBaton.post();
173 }
174
175 TEST(ThreadPoolExecutorTest, CPUPoolStats) {
176   poolStats<CPUThreadPoolExecutor>();
177 }
178
179 TEST(ThreadPoolExecutorTest, IOPoolStats) {
180   poolStats<IOThreadPoolExecutor>();
181 }
182
183 template <class TPE>
184 static void taskStats() {
185   TPE tpe(1);
186   std::atomic<int> c(0);
187   auto s = tpe.subscribeToTaskStats(
188       Observer<ThreadPoolExecutor::TaskStats>::create(
189           [&](ThreadPoolExecutor::TaskStats stats) {
190         int i = c++;
191         EXPECT_LT(milliseconds(0), stats.runTime);
192         if (i == 1) {
193           EXPECT_LT(milliseconds(0), stats.waitTime);
194         }
195       }));
196   tpe.add(burnMs(10));
197   tpe.add(burnMs(10));
198   tpe.join();
199   EXPECT_EQ(2, c);
200 }
201
202 TEST(ThreadPoolExecutorTest, CPUTaskStats) {
203   taskStats<CPUThreadPoolExecutor>();
204 }
205
206 TEST(ThreadPoolExecutorTest, IOTaskStats) {
207   taskStats<IOThreadPoolExecutor>();
208 }
209
210 template <class TPE>
211 static void expiration() {
212   TPE tpe(1);
213   std::atomic<int> statCbCount(0);
214   auto s = tpe.subscribeToTaskStats(
215       Observer<ThreadPoolExecutor::TaskStats>::create(
216           [&](ThreadPoolExecutor::TaskStats stats) {
217         int i = statCbCount++;
218         if (i == 0) {
219           EXPECT_FALSE(stats.expired);
220         } else if (i == 1) {
221           EXPECT_TRUE(stats.expired);
222         } else {
223           FAIL();
224         }
225       }));
226   std::atomic<int> expireCbCount(0);
227   auto expireCb = [&] () { expireCbCount++; };
228   tpe.add(burnMs(10), seconds(60), expireCb);
229   tpe.add(burnMs(10), milliseconds(10), expireCb);
230   tpe.join();
231   EXPECT_EQ(2, statCbCount);
232   EXPECT_EQ(1, expireCbCount);
233 }
234
235 TEST(ThreadPoolExecutorTest, CPUExpiration) {
236   expiration<CPUThreadPoolExecutor>();
237 }
238
239 TEST(ThreadPoolExecutorTest, IOExpiration) {
240   expiration<IOThreadPoolExecutor>();
241 }
242
243 template <typename TPE>
244 static void futureExecutor() {
245   FutureExecutor<TPE> fe(2);
246   std::atomic<int> c{0};
247   fe.addFuture([] () { return makeFuture<int>(42); }).then(
248     [&] (Try<int>&& t) {
249       c++;
250       EXPECT_EQ(42, t.value());
251     });
252   fe.addFuture([] () { return 100; }).then(
253     [&] (Try<int>&& t) {
254       c++;
255       EXPECT_EQ(100, t.value());
256     });
257   fe.addFuture([] () { return makeFuture(); }).then(
258     [&] (Try<Unit>&& t) {
259       c++;
260       EXPECT_NO_THROW(t.value());
261     });
262   fe.addFuture([] () { return; }).then(
263     [&] (Try<Unit>&& t) {
264       c++;
265       EXPECT_NO_THROW(t.value());
266     });
267   fe.addFuture([] () { throw std::runtime_error("oops"); }).then(
268     [&] (Try<Unit>&& t) {
269       c++;
270       EXPECT_THROW(t.value(), std::runtime_error);
271     });
272   // Test doing actual async work
273   folly::Baton<> baton;
274   fe.addFuture([&] () {
275     auto p = std::make_shared<Promise<int>>();
276     std::thread t([p](){
277       burnMs(10)();
278       p->setValue(42);
279     });
280     t.detach();
281     return p->getFuture();
282   }).then([&] (Try<int>&& t) {
283     EXPECT_EQ(42, t.value());
284     c++;
285     baton.post();
286   });
287   baton.wait();
288   fe.join();
289   EXPECT_EQ(6, c);
290 }
291
292 TEST(ThreadPoolExecutorTest, CPUFuturePool) {
293   futureExecutor<CPUThreadPoolExecutor>();
294 }
295
296 TEST(ThreadPoolExecutorTest, IOFuturePool) {
297   futureExecutor<IOThreadPoolExecutor>();
298 }
299
300 TEST(ThreadPoolExecutorTest, PriorityPreemptionTest) {
301   bool tookLopri = false;
302   auto completed = 0;
303   auto hipri = [&] {
304     EXPECT_FALSE(tookLopri);
305     completed++;
306   };
307   auto lopri = [&] {
308     tookLopri = true;
309     completed++;
310   };
311   CPUThreadPoolExecutor pool(0, 2);
312   for (int i = 0; i < 50; i++) {
313     pool.addWithPriority(lopri, Executor::LO_PRI);
314   }
315   for (int i = 0; i < 50; i++) {
316     pool.addWithPriority(hipri, Executor::HI_PRI);
317   }
318   pool.setNumThreads(1);
319   pool.join();
320   EXPECT_EQ(100, completed);
321 }
322
323 class TestObserver : public ThreadPoolExecutor::Observer {
324  public:
325   void threadStarted(ThreadPoolExecutor::ThreadHandle*) override { threads_++; }
326   void threadStopped(ThreadPoolExecutor::ThreadHandle*) override { threads_--; }
327   void threadPreviouslyStarted(ThreadPoolExecutor::ThreadHandle*) override {
328     threads_++;
329   }
330   void threadNotYetStopped(ThreadPoolExecutor::ThreadHandle*) override {
331     threads_--;
332   }
333   void checkCalls() {
334     ASSERT_EQ(threads_, 0);
335   }
336  private:
337   std::atomic<int> threads_{0};
338 };
339
340 TEST(ThreadPoolExecutorTest, IOObserver) {
341   auto observer = std::make_shared<TestObserver>();
342
343   {
344     IOThreadPoolExecutor exe(10);
345     exe.addObserver(observer);
346     exe.setNumThreads(3);
347     exe.setNumThreads(0);
348     exe.setNumThreads(7);
349     exe.removeObserver(observer);
350     exe.setNumThreads(10);
351   }
352
353   observer->checkCalls();
354 }
355
356 TEST(ThreadPoolExecutorTest, CPUObserver) {
357   auto observer = std::make_shared<TestObserver>();
358
359   {
360     CPUThreadPoolExecutor exe(10);
361     exe.addObserver(observer);
362     exe.setNumThreads(3);
363     exe.setNumThreads(0);
364     exe.setNumThreads(7);
365     exe.removeObserver(observer);
366     exe.setNumThreads(10);
367   }
368
369   observer->checkCalls();
370 }
371
372 TEST(ThreadPoolExecutorTest, AddWithPriority) {
373   std::atomic_int c{0};
374   auto f = [&]{ c++; };
375
376   // IO exe doesn't support priorities
377   IOThreadPoolExecutor ioExe(10);
378   EXPECT_THROW(ioExe.addWithPriority(f, 0), std::runtime_error);
379
380   CPUThreadPoolExecutor cpuExe(10, 3);
381   cpuExe.addWithPriority(f, -1);
382   cpuExe.addWithPriority(f, 0);
383   cpuExe.addWithPriority(f, 1);
384   cpuExe.addWithPriority(f, -2); // will add at the lowest priority
385   cpuExe.addWithPriority(f, 2);  // will add at the highest priority
386   cpuExe.addWithPriority(f, Executor::LO_PRI);
387   cpuExe.addWithPriority(f, Executor::HI_PRI);
388   cpuExe.join();
389
390   EXPECT_EQ(7, c);
391 }