Thread Observer
[folly.git] / folly / wangle / concurrent / test / ThreadPoolExecutorTest.cpp
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/wangle/concurrent/FutureExecutor.h>
18 #include <folly/wangle/concurrent/ThreadPoolExecutor.h>
19 #include <folly/wangle/concurrent/CPUThreadPoolExecutor.h>
20 #include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
21 #include <glog/logging.h>
22 #include <gtest/gtest.h>
23
24 using namespace folly::wangle;
25 using namespace std::chrono;
26
27 static folly::Func burnMs(uint64_t ms) {
28   return [ms]() { std::this_thread::sleep_for(milliseconds(ms)); };
29 }
30
31 template <class TPE>
32 static void basic() {
33   // Create and destroy
34   TPE tpe(10);
35 }
36
37 TEST(ThreadPoolExecutorTest, CPUBasic) {
38   basic<CPUThreadPoolExecutor>();
39 }
40
41 TEST(IOThreadPoolExecutorTest, IOBasic) {
42   basic<IOThreadPoolExecutor>();
43 }
44
45 template <class TPE>
46 static void resize() {
47   TPE tpe(100);
48   EXPECT_EQ(100, tpe.numThreads());
49   tpe.setNumThreads(50);
50   EXPECT_EQ(50, tpe.numThreads());
51   tpe.setNumThreads(150);
52   EXPECT_EQ(150, tpe.numThreads());
53 }
54
55 TEST(ThreadPoolExecutorTest, CPUResize) {
56   resize<CPUThreadPoolExecutor>();
57 }
58
59 TEST(ThreadPoolExecutorTest, IOResize) {
60   resize<IOThreadPoolExecutor>();
61 }
62
63 template <class TPE>
64 static void stop() {
65   TPE tpe(1);
66   std::atomic<int> completed(0);
67   auto f = [&](){
68     burnMs(10)();
69     completed++;
70   };
71   for (int i = 0; i < 1000; i++) {
72     tpe.add(f);
73   }
74   tpe.stop();
75   EXPECT_GT(1000, completed);
76 }
77
78 // IOThreadPoolExecutor's stop() behaves like join(). Outstanding tasks belong
79 // to the event base, will be executed upon its destruction, and cannot be
80 // taken back.
81 template <>
82 void stop<IOThreadPoolExecutor>() {
83   IOThreadPoolExecutor tpe(1);
84   std::atomic<int> completed(0);
85   auto f = [&](){
86     burnMs(10)();
87     completed++;
88   };
89   for (int i = 0; i < 10; i++) {
90     tpe.add(f);
91   }
92   tpe.stop();
93   EXPECT_EQ(10, completed);
94 }
95
96 TEST(ThreadPoolExecutorTest, CPUStop) {
97   stop<CPUThreadPoolExecutor>();
98 }
99
100 TEST(ThreadPoolExecutorTest, IOStop) {
101   stop<IOThreadPoolExecutor>();
102 }
103
104 template <class TPE>
105 static void join() {
106   TPE tpe(10);
107   std::atomic<int> completed(0);
108   auto f = [&](){
109     burnMs(1)();
110     completed++;
111   };
112   for (int i = 0; i < 1000; i++) {
113     tpe.add(f);
114   }
115   tpe.join();
116   EXPECT_EQ(1000, completed);
117 }
118
119 TEST(ThreadPoolExecutorTest, CPUJoin) {
120   join<CPUThreadPoolExecutor>();
121 }
122
123 TEST(ThreadPoolExecutorTest, IOJoin) {
124   join<IOThreadPoolExecutor>();
125 }
126
127 template <class TPE>
128 static void resizeUnderLoad() {
129   TPE tpe(10);
130   std::atomic<int> completed(0);
131   auto f = [&](){
132     burnMs(1)();
133     completed++;
134   };
135   for (int i = 0; i < 1000; i++) {
136     tpe.add(f);
137   }
138   tpe.setNumThreads(5);
139   tpe.setNumThreads(15);
140   tpe.join();
141   EXPECT_EQ(1000, completed);
142 }
143
144 TEST(ThreadPoolExecutorTest, CPUResizeUnderLoad) {
145   resizeUnderLoad<CPUThreadPoolExecutor>();
146 }
147
148 TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) {
149   resizeUnderLoad<IOThreadPoolExecutor>();
150 }
151
152 template <class TPE>
153 static void poolStats() {
154   folly::Baton<> startBaton, endBaton;
155   TPE tpe(1);
156   auto stats = tpe.getPoolStats();
157   EXPECT_EQ(1, stats.threadCount);
158   EXPECT_EQ(1, stats.idleThreadCount);
159   EXPECT_EQ(0, stats.activeThreadCount);
160   EXPECT_EQ(0, stats.pendingTaskCount);
161   EXPECT_EQ(0, stats.totalTaskCount);
162   tpe.add([&](){ startBaton.post(); endBaton.wait(); });
163   tpe.add([&](){});
164   startBaton.wait();
165   stats = tpe.getPoolStats();
166   EXPECT_EQ(1, stats.threadCount);
167   EXPECT_EQ(0, stats.idleThreadCount);
168   EXPECT_EQ(1, stats.activeThreadCount);
169   EXPECT_EQ(1, stats.pendingTaskCount);
170   EXPECT_EQ(2, stats.totalTaskCount);
171   endBaton.post();
172 }
173
174 TEST(ThreadPoolExecutorTest, CPUPoolStats) {
175   poolStats<CPUThreadPoolExecutor>();
176 }
177
178 TEST(ThreadPoolExecutorTest, IOPoolStats) {
179   poolStats<IOThreadPoolExecutor>();
180 }
181
182 template <class TPE>
183 static void taskStats() {
184   TPE tpe(1);
185   std::atomic<int> c(0);
186   auto s = tpe.subscribeToTaskStats(
187       Observer<ThreadPoolExecutor::TaskStats>::create(
188           [&](ThreadPoolExecutor::TaskStats stats) {
189         int i = c++;
190         EXPECT_LT(milliseconds(0), stats.runTime);
191         if (i == 1) {
192           EXPECT_LT(milliseconds(0), stats.waitTime);
193         }
194       }));
195   tpe.add(burnMs(10));
196   tpe.add(burnMs(10));
197   tpe.join();
198   EXPECT_EQ(2, c);
199 }
200
201 TEST(ThreadPoolExecutorTest, CPUTaskStats) {
202   taskStats<CPUThreadPoolExecutor>();
203 }
204
205 TEST(ThreadPoolExecutorTest, IOTaskStats) {
206   taskStats<IOThreadPoolExecutor>();
207 }
208
209 template <class TPE>
210 static void expiration() {
211   TPE tpe(1);
212   std::atomic<int> statCbCount(0);
213   auto s = tpe.subscribeToTaskStats(
214       Observer<ThreadPoolExecutor::TaskStats>::create(
215           [&](ThreadPoolExecutor::TaskStats stats) {
216         int i = statCbCount++;
217         if (i == 0) {
218           EXPECT_FALSE(stats.expired);
219         } else if (i == 1) {
220           EXPECT_TRUE(stats.expired);
221         } else {
222           FAIL();
223         }
224       }));
225   std::atomic<int> expireCbCount(0);
226   auto expireCb = [&] () { expireCbCount++; };
227   tpe.add(burnMs(10), seconds(60), expireCb);
228   tpe.add(burnMs(10), milliseconds(10), expireCb);
229   tpe.join();
230   EXPECT_EQ(2, statCbCount);
231   EXPECT_EQ(1, expireCbCount);
232 }
233
234 TEST(ThreadPoolExecutorTest, CPUExpiration) {
235   expiration<CPUThreadPoolExecutor>();
236 }
237
238 TEST(ThreadPoolExecutorTest, IOExpiration) {
239   expiration<IOThreadPoolExecutor>();
240 }
241
242 template <typename TPE>
243 static void futureExecutor() {
244   FutureExecutor<TPE> fe(2);
245   std::atomic<int> c{0};
246   fe.addFuture([] () { return makeFuture<int>(42); }).then(
247     [&] (Try<int>&& t) {
248       c++;
249       EXPECT_EQ(42, t.value());
250     });
251   fe.addFuture([] () { return 100; }).then(
252     [&] (Try<int>&& t) {
253       c++;
254       EXPECT_EQ(100, t.value());
255     });
256   fe.addFuture([] () { return makeFuture(); }).then(
257     [&] (Try<void>&& t) {
258       c++;
259       EXPECT_NO_THROW(t.value());
260     });
261   fe.addFuture([] () { return; }).then(
262     [&] (Try<void>&& t) {
263       c++;
264       EXPECT_NO_THROW(t.value());
265     });
266   fe.addFuture([] () { throw std::runtime_error("oops"); }).then(
267     [&] (Try<void>&& t) {
268       c++;
269       EXPECT_THROW(t.value(), std::runtime_error);
270     });
271   // Test doing actual async work
272   folly::Baton<> baton;
273   fe.addFuture([&] () {
274     auto p = std::make_shared<Promise<int>>();
275     std::thread t([p](){
276       burnMs(10)();
277       p->setValue(42);
278     });
279     t.detach();
280     return p->getFuture();
281   }).then([&] (Try<int>&& t) {
282     EXPECT_EQ(42, t.value());
283     c++;
284     baton.post();
285   });
286   baton.wait();
287   fe.join();
288   EXPECT_EQ(6, c);
289 }
290
291 TEST(ThreadPoolExecutorTest, CPUFuturePool) {
292   futureExecutor<CPUThreadPoolExecutor>();
293 }
294
295 TEST(ThreadPoolExecutorTest, IOFuturePool) {
296   futureExecutor<IOThreadPoolExecutor>();
297 }
298
299 TEST(ThreadPoolExecutorTest, PriorityPreemptionTest) {
300   bool tookLopri = false;
301   auto completed = 0;
302   auto hipri = [&] {
303     EXPECT_FALSE(tookLopri);
304     completed++;
305   };
306   auto lopri = [&] {
307     tookLopri = true;
308     completed++;
309   };
310   CPUThreadPoolExecutor pool(0, 2);
311   for (int i = 0; i < 50; i++) {
312     pool.add(lopri, 0);
313   }
314   for (int i = 0; i < 50; i++) {
315     pool.add(hipri, 1);
316   }
317   pool.setNumThreads(1);
318   pool.join();
319   EXPECT_EQ(100, completed);
320 }
321
322 class TestObserver : public ThreadPoolExecutor::Observer {
323  public:
324   void threadStarted(ThreadPoolExecutor::ThreadHandle*) {
325     threads_++;
326   }
327   void threadStopped(ThreadPoolExecutor::ThreadHandle*) {
328     threads_--;
329   }
330   void threadPreviouslyStarted(ThreadPoolExecutor::ThreadHandle*) {
331     threads_++;
332   }
333   void threadNotYetStopped(ThreadPoolExecutor::ThreadHandle*) {
334     threads_--;
335   }
336   void checkCalls() {
337     ASSERT_EQ(threads_, 0);
338   }
339  private:
340   int threads_{0};
341 };
342
343 TEST(ThreadPoolExecutorTest, IOObserver) {
344   auto observer = std::make_shared<TestObserver>();
345
346   {
347     IOThreadPoolExecutor exe(10);
348     exe.addObserver(observer);
349     exe.setNumThreads(3);
350     exe.setNumThreads(0);
351     exe.setNumThreads(7);
352     exe.removeObserver(observer);
353     exe.setNumThreads(10);
354   }
355
356   observer->checkCalls();
357 }
358
359 TEST(ThreadPoolExecutorTest, CPUObserver) {
360   auto observer = std::make_shared<TestObserver>();
361
362   {
363     CPUThreadPoolExecutor exe(10);
364     exe.addObserver(observer);
365     exe.setNumThreads(3);
366     exe.setNumThreads(0);
367     exe.setNumThreads(7);
368     exe.removeObserver(observer);
369     exe.setNumThreads(10);
370   }
371
372   observer->checkCalls();
373 }