3e9b68a288ca2b66fbdef2b9a6e9ea6c9b80d93a
[folly.git] / folly / executors / test / ThreadPoolExecutorTest.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <memory>
18 #include <thread>
19
20 #include <folly/executors/CPUThreadPoolExecutor.h>
21 #include <folly/executors/FutureExecutor.h>
22 #include <folly/executors/IOThreadPoolExecutor.h>
23 #include <folly/executors/ThreadPoolExecutor.h>
24 #include <folly/executors/task_queue/LifoSemMPMCQueue.h>
25 #include <folly/executors/thread_factory/PriorityThreadFactory.h>
26 #include <folly/portability/GTest.h>
27
28 using namespace folly;
29 using namespace std::chrono;
30
31 static Func burnMs(uint64_t ms) {
32   return [ms]() { std::this_thread::sleep_for(milliseconds(ms)); };
33 }
34
35 template <class TPE>
36 static void basic() {
37   // Create and destroy
38   TPE tpe(10);
39 }
40
41 TEST(ThreadPoolExecutorTest, CPUBasic) {
42   basic<CPUThreadPoolExecutor>();
43 }
44
45 TEST(IOThreadPoolExecutorTest, IOBasic) {
46   basic<IOThreadPoolExecutor>();
47 }
48
49 template <class TPE>
50 static void resize() {
51   TPE tpe(100);
52   EXPECT_EQ(100, tpe.numThreads());
53   tpe.setNumThreads(50);
54   EXPECT_EQ(50, tpe.numThreads());
55   tpe.setNumThreads(150);
56   EXPECT_EQ(150, tpe.numThreads());
57 }
58
59 TEST(ThreadPoolExecutorTest, CPUResize) {
60   resize<CPUThreadPoolExecutor>();
61 }
62
63 TEST(ThreadPoolExecutorTest, IOResize) {
64   resize<IOThreadPoolExecutor>();
65 }
66
67 template <class TPE>
68 static void stop() {
69   TPE tpe(1);
70   std::atomic<int> completed(0);
71   auto f = [&]() {
72     burnMs(10)();
73     completed++;
74   };
75   for (int i = 0; i < 1000; i++) {
76     tpe.add(f);
77   }
78   tpe.stop();
79   EXPECT_GT(1000, completed);
80 }
81
82 // IOThreadPoolExecutor's stop() behaves like join(). Outstanding tasks belong
83 // to the event base, will be executed upon its destruction, and cannot be
84 // taken back.
85 template <>
86 void stop<IOThreadPoolExecutor>() {
87   IOThreadPoolExecutor tpe(1);
88   std::atomic<int> completed(0);
89   auto f = [&]() {
90     burnMs(10)();
91     completed++;
92   };
93   for (int i = 0; i < 10; i++) {
94     tpe.add(f);
95   }
96   tpe.stop();
97   EXPECT_EQ(10, completed);
98 }
99
100 TEST(ThreadPoolExecutorTest, CPUStop) {
101   stop<CPUThreadPoolExecutor>();
102 }
103
104 TEST(ThreadPoolExecutorTest, IOStop) {
105   stop<IOThreadPoolExecutor>();
106 }
107
108 template <class TPE>
109 static void join() {
110   TPE tpe(10);
111   std::atomic<int> completed(0);
112   auto f = [&]() {
113     burnMs(1)();
114     completed++;
115   };
116   for (int i = 0; i < 1000; i++) {
117     tpe.add(f);
118   }
119   tpe.join();
120   EXPECT_EQ(1000, completed);
121 }
122
123 TEST(ThreadPoolExecutorTest, CPUJoin) {
124   join<CPUThreadPoolExecutor>();
125 }
126
127 TEST(ThreadPoolExecutorTest, IOJoin) {
128   join<IOThreadPoolExecutor>();
129 }
130
131 template <class TPE>
132 static void resizeUnderLoad() {
133   TPE tpe(10);
134   std::atomic<int> completed(0);
135   auto f = [&]() {
136     burnMs(1)();
137     completed++;
138   };
139   for (int i = 0; i < 1000; i++) {
140     tpe.add(f);
141   }
142   tpe.setNumThreads(5);
143   tpe.setNumThreads(15);
144   tpe.join();
145   EXPECT_EQ(1000, completed);
146 }
147
148 TEST(ThreadPoolExecutorTest, CPUResizeUnderLoad) {
149   resizeUnderLoad<CPUThreadPoolExecutor>();
150 }
151
152 TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) {
153   resizeUnderLoad<IOThreadPoolExecutor>();
154 }
155
156 template <class TPE>
157 static void poolStats() {
158   folly::Baton<> startBaton, endBaton;
159   TPE tpe(1);
160   auto stats = tpe.getPoolStats();
161   EXPECT_EQ(1, stats.threadCount);
162   EXPECT_EQ(1, stats.idleThreadCount);
163   EXPECT_EQ(0, stats.activeThreadCount);
164   EXPECT_EQ(0, stats.pendingTaskCount);
165   EXPECT_EQ(0, tpe.getPendingTaskCount());
166   EXPECT_EQ(0, stats.totalTaskCount);
167   tpe.add([&]() {
168     startBaton.post();
169     endBaton.wait();
170   });
171   tpe.add([&]() {});
172   startBaton.wait();
173   stats = tpe.getPoolStats();
174   EXPECT_EQ(1, stats.threadCount);
175   EXPECT_EQ(0, stats.idleThreadCount);
176   EXPECT_EQ(1, stats.activeThreadCount);
177   EXPECT_EQ(1, stats.pendingTaskCount);
178   EXPECT_EQ(1, tpe.getPendingTaskCount());
179   EXPECT_EQ(2, stats.totalTaskCount);
180   endBaton.post();
181 }
182
183 TEST(ThreadPoolExecutorTest, CPUPoolStats) {
184   poolStats<CPUThreadPoolExecutor>();
185 }
186
187 TEST(ThreadPoolExecutorTest, IOPoolStats) {
188   poolStats<IOThreadPoolExecutor>();
189 }
190
191 template <class TPE>
192 static void taskStats() {
193   TPE tpe(1);
194   std::atomic<int> c(0);
195   tpe.subscribeToTaskStats([&](ThreadPoolExecutor::TaskStats stats) {
196     int i = c++;
197     EXPECT_LT(milliseconds(0), stats.runTime);
198     if (i == 1) {
199       EXPECT_LT(milliseconds(0), stats.waitTime);
200     }
201   });
202   tpe.add(burnMs(10));
203   tpe.add(burnMs(10));
204   tpe.join();
205   EXPECT_EQ(2, c);
206 }
207
208 TEST(ThreadPoolExecutorTest, CPUTaskStats) {
209   taskStats<CPUThreadPoolExecutor>();
210 }
211
212 TEST(ThreadPoolExecutorTest, IOTaskStats) {
213   taskStats<IOThreadPoolExecutor>();
214 }
215
216 template <class TPE>
217 static void expiration() {
218   TPE tpe(1);
219   std::atomic<int> statCbCount(0);
220   tpe.subscribeToTaskStats([&](ThreadPoolExecutor::TaskStats stats) {
221     int i = statCbCount++;
222     if (i == 0) {
223       EXPECT_FALSE(stats.expired);
224     } else if (i == 1) {
225       EXPECT_TRUE(stats.expired);
226     } else {
227       FAIL();
228     }
229   });
230   std::atomic<int> expireCbCount(0);
231   auto expireCb = [&]() { expireCbCount++; };
232   tpe.add(burnMs(10), seconds(60), expireCb);
233   tpe.add(burnMs(10), milliseconds(10), expireCb);
234   tpe.join();
235   EXPECT_EQ(2, statCbCount);
236   EXPECT_EQ(1, expireCbCount);
237 }
238
239 TEST(ThreadPoolExecutorTest, CPUExpiration) {
240   expiration<CPUThreadPoolExecutor>();
241 }
242
243 TEST(ThreadPoolExecutorTest, IOExpiration) {
244   expiration<IOThreadPoolExecutor>();
245 }
246
247 template <typename TPE>
248 static void futureExecutor() {
249   FutureExecutor<TPE> fe(2);
250   std::atomic<int> c{0};
251   fe.addFuture([]() { return makeFuture<int>(42); }).then([&](Try<int>&& t) {
252     c++;
253     EXPECT_EQ(42, t.value());
254   });
255   fe.addFuture([]() { return 100; }).then([&](Try<int>&& t) {
256     c++;
257     EXPECT_EQ(100, t.value());
258   });
259   fe.addFuture([]() { return makeFuture(); }).then([&](Try<Unit>&& t) {
260     c++;
261     EXPECT_NO_THROW(t.value());
262   });
263   fe.addFuture([]() { return; }).then([&](Try<Unit>&& t) {
264     c++;
265     EXPECT_NO_THROW(t.value());
266   });
267   fe.addFuture([]() { throw std::runtime_error("oops"); })
268       .then([&](Try<Unit>&& t) {
269         c++;
270         EXPECT_THROW(t.value(), std::runtime_error);
271       });
272   // Test doing actual async work
273   folly::Baton<> baton;
274   fe.addFuture([&]() {
275       auto p = std::make_shared<Promise<int>>();
276       std::thread t([p]() {
277         burnMs(10)();
278         p->setValue(42);
279       });
280       t.detach();
281       return p->getFuture();
282     })
283       .then([&](Try<int>&& t) {
284         EXPECT_EQ(42, t.value());
285         c++;
286         baton.post();
287       });
288   baton.wait();
289   fe.join();
290   EXPECT_EQ(6, c);
291 }
292
293 TEST(ThreadPoolExecutorTest, CPUFuturePool) {
294   futureExecutor<CPUThreadPoolExecutor>();
295 }
296
297 TEST(ThreadPoolExecutorTest, IOFuturePool) {
298   futureExecutor<IOThreadPoolExecutor>();
299 }
300
301 TEST(ThreadPoolExecutorTest, PriorityPreemptionTest) {
302   bool tookLopri = false;
303   auto completed = 0;
304   auto hipri = [&] {
305     EXPECT_FALSE(tookLopri);
306     completed++;
307   };
308   auto lopri = [&] {
309     tookLopri = true;
310     completed++;
311   };
312   CPUThreadPoolExecutor pool(0, 2);
313   for (int i = 0; i < 50; i++) {
314     pool.addWithPriority(lopri, Executor::LO_PRI);
315   }
316   for (int i = 0; i < 50; i++) {
317     pool.addWithPriority(hipri, Executor::HI_PRI);
318   }
319   pool.setNumThreads(1);
320   pool.join();
321   EXPECT_EQ(100, completed);
322 }
323
324 class TestObserver : public ThreadPoolExecutor::Observer {
325  public:
326   void threadStarted(ThreadPoolExecutor::ThreadHandle*) override {
327     threads_++;
328   }
329   void threadStopped(ThreadPoolExecutor::ThreadHandle*) override {
330     threads_--;
331   }
332   void threadPreviouslyStarted(ThreadPoolExecutor::ThreadHandle*) override {
333     threads_++;
334   }
335   void threadNotYetStopped(ThreadPoolExecutor::ThreadHandle*) override {
336     threads_--;
337   }
338   void checkCalls() {
339     ASSERT_EQ(threads_, 0);
340   }
341
342  private:
343   std::atomic<int> threads_{0};
344 };
345
346 TEST(ThreadPoolExecutorTest, IOObserver) {
347   auto observer = std::make_shared<TestObserver>();
348
349   {
350     IOThreadPoolExecutor exe(10);
351     exe.addObserver(observer);
352     exe.setNumThreads(3);
353     exe.setNumThreads(0);
354     exe.setNumThreads(7);
355     exe.removeObserver(observer);
356     exe.setNumThreads(10);
357   }
358
359   observer->checkCalls();
360 }
361
362 TEST(ThreadPoolExecutorTest, CPUObserver) {
363   auto observer = std::make_shared<TestObserver>();
364
365   {
366     CPUThreadPoolExecutor exe(10);
367     exe.addObserver(observer);
368     exe.setNumThreads(3);
369     exe.setNumThreads(0);
370     exe.setNumThreads(7);
371     exe.removeObserver(observer);
372     exe.setNumThreads(10);
373   }
374
375   observer->checkCalls();
376 }
377
378 TEST(ThreadPoolExecutorTest, AddWithPriority) {
379   std::atomic_int c{0};
380   auto f = [&] { c++; };
381
382   // IO exe doesn't support priorities
383   IOThreadPoolExecutor ioExe(10);
384   EXPECT_THROW(ioExe.addWithPriority(f, 0), std::runtime_error);
385
386   CPUThreadPoolExecutor cpuExe(10, 3);
387   cpuExe.addWithPriority(f, -1);
388   cpuExe.addWithPriority(f, 0);
389   cpuExe.addWithPriority(f, 1);
390   cpuExe.addWithPriority(f, -2); // will add at the lowest priority
391   cpuExe.addWithPriority(f, 2); // will add at the highest priority
392   cpuExe.addWithPriority(f, Executor::LO_PRI);
393   cpuExe.addWithPriority(f, Executor::HI_PRI);
394   cpuExe.join();
395
396   EXPECT_EQ(7, c);
397 }
398
399 TEST(ThreadPoolExecutorTest, BlockingQueue) {
400   std::atomic_int c{0};
401   auto f = [&] {
402     burnMs(1)();
403     c++;
404   };
405   const int kQueueCapacity = 1;
406   const int kThreads = 1;
407
408   auto queue = std::make_unique<LifoSemMPMCQueue<
409       CPUThreadPoolExecutor::CPUTask,
410       QueueBehaviorIfFull::BLOCK>>(kQueueCapacity);
411
412   CPUThreadPoolExecutor cpuExe(
413       kThreads,
414       std::move(queue),
415       std::make_shared<NamedThreadFactory>("CPUThreadPool"));
416
417   // Add `f` five times. It sleeps for 1ms every time. Calling
418   // `cppExec.add()` is *almost* guaranteed to block because there's
419   // only 1 cpu worker thread.
420   for (int i = 0; i < 5; i++) {
421     EXPECT_NO_THROW(cpuExe.add(f));
422   }
423   cpuExe.join();
424
425   EXPECT_EQ(5, c);
426 }
427
428 TEST(PriorityThreadFactoryTest, ThreadPriority) {
429   PriorityThreadFactory factory(
430       std::make_shared<NamedThreadFactory>("stuff"), 1);
431   int actualPriority = -21;
432   factory.newThread([&]() { actualPriority = getpriority(PRIO_PROCESS, 0); })
433       .join();
434   EXPECT_EQ(1, actualPriority);
435 }
436
437 class TestData : public folly::RequestData {
438  public:
439   explicit TestData(int data) : data_(data) {}
440   ~TestData() override {}
441
442   bool hasCallback() override {
443     return false;
444   }
445
446   int data_;
447 };
448
449 TEST(ThreadPoolExecutorTest, RequestContext) {
450   CPUThreadPoolExecutor executor(1);
451
452   RequestContextScopeGuard rctx; // create new request context for this scope
453   EXPECT_EQ(nullptr, RequestContext::get()->getContextData("test"));
454   RequestContext::get()->setContextData("test", std::make_unique<TestData>(42));
455   auto data = RequestContext::get()->getContextData("test");
456   EXPECT_EQ(42, dynamic_cast<TestData*>(data)->data_);
457
458   executor.add([] {
459     auto data = RequestContext::get()->getContextData("test");
460     ASSERT_TRUE(data != nullptr);
461     EXPECT_EQ(42, dynamic_cast<TestData*>(data)->data_);
462   });
463 }
464
465 struct SlowMover {
466   explicit SlowMover(bool slow = false) : slow(slow) {}
467   SlowMover(SlowMover&& other) noexcept {
468     *this = std::move(other);
469   }
470   SlowMover& operator=(SlowMover&& other) noexcept {
471     slow = other.slow;
472     if (slow) {
473       /* sleep override */ std::this_thread::sleep_for(milliseconds(50));
474     }
475     return *this;
476   }
477
478   bool slow;
479 };
480
481 TEST(ThreadPoolExecutorTest, BugD3527722) {
482   // Test that the queue does not get stuck if writes are completed in
483   // order opposite to how they are initiated.
484   LifoSemMPMCQueue<SlowMover> q(1024);
485   std::atomic<int> turn{};
486
487   std::thread consumer1([&] {
488     ++turn;
489     q.take();
490   });
491   std::thread consumer2([&] {
492     ++turn;
493     q.take();
494   });
495
496   std::thread producer1([&] {
497     ++turn;
498     while (turn < 4) {
499       ;
500     }
501     ++turn;
502     q.add(SlowMover(true));
503   });
504   std::thread producer2([&] {
505     ++turn;
506     while (turn < 5) {
507       ;
508     }
509     q.add(SlowMover(false));
510   });
511
512   producer1.join();
513   producer2.join();
514   consumer1.join();
515   consumer2.join();
516 }
517
518 template <typename TPE, typename ERR_T>
519 static void ShutdownTest() {
520   // test that adding a .then() after we have
521   // started shutting down does not deadlock
522   folly::Optional<folly::Future<int>> f;
523   {
524     TPE fe(1);
525     f = folly::makeFuture().via(&fe).then([]() { burnMs(100)(); }).then([]() {
526       return 77;
527     });
528   }
529   EXPECT_THROW(f->get(), ERR_T);
530 }
531
532 TEST(ThreadPoolExecutorTest, ShutdownTestIO) {
533   ShutdownTest<IOThreadPoolExecutor, std::runtime_error>();
534 }
535
536 TEST(ThreadPoolExecutorTest, ShutdownTestCPU) {
537   ShutdownTest<CPUThreadPoolExecutor, folly::FutureException>();
538 }
539
540 template <typename TPE>
541 static void removeThreadTest() {
542   // test that adding a .then() after we have removed some threads
543   // doesn't cause deadlock and they are executed on different threads
544   folly::Optional<folly::Future<int>> f;
545   std::thread::id id1, id2;
546   TPE fe(2);
547   f = folly::makeFuture()
548           .via(&fe)
549           .then([&id1]() {
550             burnMs(100)();
551             id1 = std::this_thread::get_id();
552           })
553           .then([&id2]() {
554             return 77;
555             id2 = std::this_thread::get_id();
556           });
557   fe.setNumThreads(1);
558
559   // future::then should be fulfilled because there is other thread available
560   EXPECT_EQ(77, f->get());
561   // two thread should be different because then part should be rescheduled to
562   // the other thread
563   EXPECT_NE(id1, id2);
564 }
565
566 TEST(ThreadPoolExecutorTest, RemoveThreadTestIO) {
567   removeThreadTest<IOThreadPoolExecutor>();
568 }
569
570 TEST(ThreadPoolExecutorTest, RemoveThreadTestCPU) {
571   removeThreadTest<CPUThreadPoolExecutor>();
572 }
573
574 template <typename TPE>
575 static void resizeThreadWhileExecutingTest() {
576   TPE tpe(10);
577   EXPECT_EQ(10, tpe.numThreads());
578
579   std::atomic<int> completed(0);
580   auto f = [&]() {
581     burnMs(10)();
582     completed++;
583   };
584   for (int i = 0; i < 1000; i++) {
585     tpe.add(f);
586   }
587   tpe.setNumThreads(8);
588   EXPECT_EQ(8, tpe.numThreads());
589   tpe.setNumThreads(5);
590   EXPECT_EQ(5, tpe.numThreads());
591   tpe.setNumThreads(15);
592   EXPECT_EQ(15, tpe.numThreads());
593   tpe.stop();
594   EXPECT_EQ(1000, completed);
595 }
596
597 TEST(ThreadPoolExecutorTest, resizeThreadWhileExecutingTestIO) {
598   resizeThreadWhileExecutingTest<IOThreadPoolExecutor>();
599 }
600
601 TEST(ThreadPoolExecutorTest, resizeThreadWhileExecutingTestCPU) {
602   resizeThreadWhileExecutingTest<CPUThreadPoolExecutor>();
603 }