Fix copyright lines
[folly.git] / folly / executors / test / ThreadPoolExecutorTest.cpp
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <memory>
18 #include <thread>
19
20 #include <folly/executors/CPUThreadPoolExecutor.h>
21 #include <folly/executors/FutureExecutor.h>
22 #include <folly/executors/IOThreadPoolExecutor.h>
23 #include <folly/executors/ThreadPoolExecutor.h>
24 #include <folly/executors/task_queue/LifoSemMPMCQueue.h>
25 #include <folly/executors/task_queue/UnboundedBlockingQueue.h>
26 #include <folly/executors/thread_factory/PriorityThreadFactory.h>
27 #include <folly/portability/GTest.h>
28
29 using namespace folly;
30 using namespace std::chrono;
31
32 static Func burnMs(uint64_t ms) {
33   return [ms]() { std::this_thread::sleep_for(milliseconds(ms)); };
34 }
35
36 template <class TPE>
37 static void basic() {
38   // Create and destroy
39   TPE tpe(10);
40 }
41
42 TEST(ThreadPoolExecutorTest, CPUBasic) {
43   basic<CPUThreadPoolExecutor>();
44 }
45
46 TEST(IOThreadPoolExecutorTest, IOBasic) {
47   basic<IOThreadPoolExecutor>();
48 }
49
50 template <class TPE>
51 static void resize() {
52   TPE tpe(100);
53   EXPECT_EQ(100, tpe.numThreads());
54   tpe.setNumThreads(50);
55   EXPECT_EQ(50, tpe.numThreads());
56   tpe.setNumThreads(150);
57   EXPECT_EQ(150, tpe.numThreads());
58 }
59
60 TEST(ThreadPoolExecutorTest, CPUResize) {
61   resize<CPUThreadPoolExecutor>();
62 }
63
64 TEST(ThreadPoolExecutorTest, IOResize) {
65   resize<IOThreadPoolExecutor>();
66 }
67
68 template <class TPE>
69 static void stop() {
70   TPE tpe(1);
71   std::atomic<int> completed(0);
72   auto f = [&]() {
73     burnMs(10)();
74     completed++;
75   };
76   for (int i = 0; i < 1000; i++) {
77     tpe.add(f);
78   }
79   tpe.stop();
80   EXPECT_GT(1000, completed);
81 }
82
83 // IOThreadPoolExecutor's stop() behaves like join(). Outstanding tasks belong
84 // to the event base, will be executed upon its destruction, and cannot be
85 // taken back.
86 template <>
87 void stop<IOThreadPoolExecutor>() {
88   IOThreadPoolExecutor tpe(1);
89   std::atomic<int> completed(0);
90   auto f = [&]() {
91     burnMs(10)();
92     completed++;
93   };
94   for (int i = 0; i < 10; i++) {
95     tpe.add(f);
96   }
97   tpe.stop();
98   EXPECT_EQ(10, completed);
99 }
100
101 TEST(ThreadPoolExecutorTest, CPUStop) {
102   stop<CPUThreadPoolExecutor>();
103 }
104
105 TEST(ThreadPoolExecutorTest, IOStop) {
106   stop<IOThreadPoolExecutor>();
107 }
108
109 template <class TPE>
110 static void join() {
111   TPE tpe(10);
112   std::atomic<int> completed(0);
113   auto f = [&]() {
114     burnMs(1)();
115     completed++;
116   };
117   for (int i = 0; i < 1000; i++) {
118     tpe.add(f);
119   }
120   tpe.join();
121   EXPECT_EQ(1000, completed);
122 }
123
124 TEST(ThreadPoolExecutorTest, CPUJoin) {
125   join<CPUThreadPoolExecutor>();
126 }
127
128 TEST(ThreadPoolExecutorTest, IOJoin) {
129   join<IOThreadPoolExecutor>();
130 }
131
132 template <class TPE>
133 static void resizeUnderLoad() {
134   TPE tpe(10);
135   std::atomic<int> completed(0);
136   auto f = [&]() {
137     burnMs(1)();
138     completed++;
139   };
140   for (int i = 0; i < 1000; i++) {
141     tpe.add(f);
142   }
143   tpe.setNumThreads(5);
144   tpe.setNumThreads(15);
145   tpe.join();
146   EXPECT_EQ(1000, completed);
147 }
148
149 TEST(ThreadPoolExecutorTest, CPUResizeUnderLoad) {
150   resizeUnderLoad<CPUThreadPoolExecutor>();
151 }
152
153 TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) {
154   resizeUnderLoad<IOThreadPoolExecutor>();
155 }
156
157 template <class TPE>
158 static void poolStats() {
159   folly::Baton<> startBaton, endBaton;
160   TPE tpe(1);
161   auto stats = tpe.getPoolStats();
162   EXPECT_EQ(1, stats.threadCount);
163   EXPECT_EQ(1, stats.idleThreadCount);
164   EXPECT_EQ(0, stats.activeThreadCount);
165   EXPECT_EQ(0, stats.pendingTaskCount);
166   EXPECT_EQ(0, tpe.getPendingTaskCount());
167   EXPECT_EQ(0, stats.totalTaskCount);
168   tpe.add([&]() {
169     startBaton.post();
170     endBaton.wait();
171   });
172   tpe.add([&]() {});
173   startBaton.wait();
174   stats = tpe.getPoolStats();
175   EXPECT_EQ(1, stats.threadCount);
176   EXPECT_EQ(0, stats.idleThreadCount);
177   EXPECT_EQ(1, stats.activeThreadCount);
178   EXPECT_EQ(1, stats.pendingTaskCount);
179   EXPECT_EQ(1, tpe.getPendingTaskCount());
180   EXPECT_EQ(2, stats.totalTaskCount);
181   endBaton.post();
182 }
183
184 TEST(ThreadPoolExecutorTest, CPUPoolStats) {
185   poolStats<CPUThreadPoolExecutor>();
186 }
187
188 TEST(ThreadPoolExecutorTest, IOPoolStats) {
189   poolStats<IOThreadPoolExecutor>();
190 }
191
192 template <class TPE>
193 static void taskStats() {
194   TPE tpe(1);
195   std::atomic<int> c(0);
196   tpe.subscribeToTaskStats([&](ThreadPoolExecutor::TaskStats stats) {
197     int i = c++;
198     EXPECT_LT(milliseconds(0), stats.runTime);
199     if (i == 1) {
200       EXPECT_LT(milliseconds(0), stats.waitTime);
201     }
202   });
203   tpe.add(burnMs(10));
204   tpe.add(burnMs(10));
205   tpe.join();
206   EXPECT_EQ(2, c);
207 }
208
209 TEST(ThreadPoolExecutorTest, CPUTaskStats) {
210   taskStats<CPUThreadPoolExecutor>();
211 }
212
213 TEST(ThreadPoolExecutorTest, IOTaskStats) {
214   taskStats<IOThreadPoolExecutor>();
215 }
216
217 template <class TPE>
218 static void expiration() {
219   TPE tpe(1);
220   std::atomic<int> statCbCount(0);
221   tpe.subscribeToTaskStats([&](ThreadPoolExecutor::TaskStats stats) {
222     int i = statCbCount++;
223     if (i == 0) {
224       EXPECT_FALSE(stats.expired);
225     } else if (i == 1) {
226       EXPECT_TRUE(stats.expired);
227     } else {
228       FAIL();
229     }
230   });
231   std::atomic<int> expireCbCount(0);
232   auto expireCb = [&]() { expireCbCount++; };
233   tpe.add(burnMs(10), seconds(60), expireCb);
234   tpe.add(burnMs(10), milliseconds(10), expireCb);
235   tpe.join();
236   EXPECT_EQ(2, statCbCount);
237   EXPECT_EQ(1, expireCbCount);
238 }
239
240 TEST(ThreadPoolExecutorTest, CPUExpiration) {
241   expiration<CPUThreadPoolExecutor>();
242 }
243
244 TEST(ThreadPoolExecutorTest, IOExpiration) {
245   expiration<IOThreadPoolExecutor>();
246 }
247
248 template <typename TPE>
249 static void futureExecutor() {
250   FutureExecutor<TPE> fe(2);
251   std::atomic<int> c{0};
252   fe.addFuture([]() { return makeFuture<int>(42); }).then([&](Try<int>&& t) {
253     c++;
254     EXPECT_EQ(42, t.value());
255   });
256   fe.addFuture([]() { return 100; }).then([&](Try<int>&& t) {
257     c++;
258     EXPECT_EQ(100, t.value());
259   });
260   fe.addFuture([]() { return makeFuture(); }).then([&](Try<Unit>&& t) {
261     c++;
262     EXPECT_NO_THROW(t.value());
263   });
264   fe.addFuture([]() { return; }).then([&](Try<Unit>&& t) {
265     c++;
266     EXPECT_NO_THROW(t.value());
267   });
268   fe.addFuture([]() { throw std::runtime_error("oops"); })
269       .then([&](Try<Unit>&& t) {
270         c++;
271         EXPECT_THROW(t.value(), std::runtime_error);
272       });
273   // Test doing actual async work
274   folly::Baton<> baton;
275   fe.addFuture([&]() {
276       auto p = std::make_shared<Promise<int>>();
277       std::thread t([p]() {
278         burnMs(10)();
279         p->setValue(42);
280       });
281       t.detach();
282       return p->getFuture();
283     })
284       .then([&](Try<int>&& t) {
285         EXPECT_EQ(42, t.value());
286         c++;
287         baton.post();
288       });
289   baton.wait();
290   fe.join();
291   EXPECT_EQ(6, c);
292 }
293
294 TEST(ThreadPoolExecutorTest, CPUFuturePool) {
295   futureExecutor<CPUThreadPoolExecutor>();
296 }
297
298 TEST(ThreadPoolExecutorTest, IOFuturePool) {
299   futureExecutor<IOThreadPoolExecutor>();
300 }
301
302 TEST(ThreadPoolExecutorTest, PriorityPreemptionTest) {
303   bool tookLopri = false;
304   auto completed = 0;
305   auto hipri = [&] {
306     EXPECT_FALSE(tookLopri);
307     completed++;
308   };
309   auto lopri = [&] {
310     tookLopri = true;
311     completed++;
312   };
313   CPUThreadPoolExecutor pool(0, 2);
314   for (int i = 0; i < 50; i++) {
315     pool.addWithPriority(lopri, Executor::LO_PRI);
316   }
317   for (int i = 0; i < 50; i++) {
318     pool.addWithPriority(hipri, Executor::HI_PRI);
319   }
320   pool.setNumThreads(1);
321   pool.join();
322   EXPECT_EQ(100, completed);
323 }
324
325 class TestObserver : public ThreadPoolExecutor::Observer {
326  public:
327   void threadStarted(ThreadPoolExecutor::ThreadHandle*) override {
328     threads_++;
329   }
330   void threadStopped(ThreadPoolExecutor::ThreadHandle*) override {
331     threads_--;
332   }
333   void threadPreviouslyStarted(ThreadPoolExecutor::ThreadHandle*) override {
334     threads_++;
335   }
336   void threadNotYetStopped(ThreadPoolExecutor::ThreadHandle*) override {
337     threads_--;
338   }
339   void checkCalls() {
340     ASSERT_EQ(threads_, 0);
341   }
342
343  private:
344   std::atomic<int> threads_{0};
345 };
346
347 TEST(ThreadPoolExecutorTest, IOObserver) {
348   auto observer = std::make_shared<TestObserver>();
349
350   {
351     IOThreadPoolExecutor exe(10);
352     exe.addObserver(observer);
353     exe.setNumThreads(3);
354     exe.setNumThreads(0);
355     exe.setNumThreads(7);
356     exe.removeObserver(observer);
357     exe.setNumThreads(10);
358   }
359
360   observer->checkCalls();
361 }
362
363 TEST(ThreadPoolExecutorTest, CPUObserver) {
364   auto observer = std::make_shared<TestObserver>();
365
366   {
367     CPUThreadPoolExecutor exe(10);
368     exe.addObserver(observer);
369     exe.setNumThreads(3);
370     exe.setNumThreads(0);
371     exe.setNumThreads(7);
372     exe.removeObserver(observer);
373     exe.setNumThreads(10);
374   }
375
376   observer->checkCalls();
377 }
378
379 TEST(ThreadPoolExecutorTest, AddWithPriority) {
380   std::atomic_int c{0};
381   auto f = [&] { c++; };
382
383   // IO exe doesn't support priorities
384   IOThreadPoolExecutor ioExe(10);
385   EXPECT_THROW(ioExe.addWithPriority(f, 0), std::runtime_error);
386
387   CPUThreadPoolExecutor cpuExe(10, 3);
388   cpuExe.addWithPriority(f, -1);
389   cpuExe.addWithPriority(f, 0);
390   cpuExe.addWithPriority(f, 1);
391   cpuExe.addWithPriority(f, -2); // will add at the lowest priority
392   cpuExe.addWithPriority(f, 2); // will add at the highest priority
393   cpuExe.addWithPriority(f, Executor::LO_PRI);
394   cpuExe.addWithPriority(f, Executor::HI_PRI);
395   cpuExe.join();
396
397   EXPECT_EQ(7, c);
398 }
399
400 TEST(ThreadPoolExecutorTest, BlockingQueue) {
401   std::atomic_int c{0};
402   auto f = [&] {
403     burnMs(1)();
404     c++;
405   };
406   const int kQueueCapacity = 1;
407   const int kThreads = 1;
408
409   auto queue = std::make_unique<LifoSemMPMCQueue<
410       CPUThreadPoolExecutor::CPUTask,
411       QueueBehaviorIfFull::BLOCK>>(kQueueCapacity);
412
413   CPUThreadPoolExecutor cpuExe(
414       kThreads,
415       std::move(queue),
416       std::make_shared<NamedThreadFactory>("CPUThreadPool"));
417
418   // Add `f` five times. It sleeps for 1ms every time. Calling
419   // `cppExec.add()` is *almost* guaranteed to block because there's
420   // only 1 cpu worker thread.
421   for (int i = 0; i < 5; i++) {
422     EXPECT_NO_THROW(cpuExe.add(f));
423   }
424   cpuExe.join();
425
426   EXPECT_EQ(5, c);
427 }
428
429 TEST(PriorityThreadFactoryTest, ThreadPriority) {
430   PriorityThreadFactory factory(
431       std::make_shared<NamedThreadFactory>("stuff"), 1);
432   int actualPriority = -21;
433   factory.newThread([&]() { actualPriority = getpriority(PRIO_PROCESS, 0); })
434       .join();
435   EXPECT_EQ(1, actualPriority);
436 }
437
438 class TestData : public folly::RequestData {
439  public:
440   explicit TestData(int data) : data_(data) {}
441   ~TestData() override {}
442
443   bool hasCallback() override {
444     return false;
445   }
446
447   int data_;
448 };
449
450 TEST(ThreadPoolExecutorTest, RequestContext) {
451   CPUThreadPoolExecutor executor(1);
452
453   RequestContextScopeGuard rctx; // create new request context for this scope
454   EXPECT_EQ(nullptr, RequestContext::get()->getContextData("test"));
455   RequestContext::get()->setContextData("test", std::make_unique<TestData>(42));
456   auto data = RequestContext::get()->getContextData("test");
457   EXPECT_EQ(42, dynamic_cast<TestData*>(data)->data_);
458
459   executor.add([] {
460     auto data = RequestContext::get()->getContextData("test");
461     ASSERT_TRUE(data != nullptr);
462     EXPECT_EQ(42, dynamic_cast<TestData*>(data)->data_);
463   });
464 }
465
466 struct SlowMover {
467   explicit SlowMover(bool slow = false) : slow(slow) {}
468   SlowMover(SlowMover&& other) noexcept {
469     *this = std::move(other);
470   }
471   SlowMover& operator=(SlowMover&& other) noexcept {
472     slow = other.slow;
473     if (slow) {
474       /* sleep override */ std::this_thread::sleep_for(milliseconds(50));
475     }
476     return *this;
477   }
478
479   bool slow;
480 };
481
482 template <typename Q>
483 void bugD3527722_test() {
484   // Test that the queue does not get stuck if writes are completed in
485   // order opposite to how they are initiated.
486   Q q(1024);
487   std::atomic<int> turn{};
488
489   std::thread consumer1([&] {
490     ++turn;
491     q.take();
492   });
493   std::thread consumer2([&] {
494     ++turn;
495     q.take();
496   });
497
498   std::thread producer1([&] {
499     ++turn;
500     while (turn < 4) {
501       ;
502     }
503     ++turn;
504     q.add(SlowMover(true));
505   });
506   std::thread producer2([&] {
507     ++turn;
508     while (turn < 5) {
509       ;
510     }
511     q.add(SlowMover(false));
512   });
513
514   producer1.join();
515   producer2.join();
516   consumer1.join();
517   consumer2.join();
518 }
519
520 TEST(ThreadPoolExecutorTest, LifoSemMPMCQueueBugD3527722) {
521   bugD3527722_test<LifoSemMPMCQueue<SlowMover>>();
522 }
523
524 template <typename T>
525 struct UBQ : public UnboundedBlockingQueue<T> {
526   explicit UBQ(int) {}
527 };
528
529 TEST(ThreadPoolExecutorTest, UnboundedBlockingQueueBugD3527722) {
530   bugD3527722_test<UBQ<SlowMover>>();
531 }
532
533 template <typename TPE, typename ERR_T>
534 static void ShutdownTest() {
535   // test that adding a .then() after we have
536   // started shutting down does not deadlock
537   folly::Optional<folly::Future<int>> f;
538   {
539     TPE fe(1);
540     f = folly::makeFuture().via(&fe).then([]() { burnMs(100)(); }).then([]() {
541       return 77;
542     });
543   }
544   EXPECT_THROW(f->get(), ERR_T);
545 }
546
547 TEST(ThreadPoolExecutorTest, ShutdownTestIO) {
548   ShutdownTest<IOThreadPoolExecutor, std::runtime_error>();
549 }
550
551 TEST(ThreadPoolExecutorTest, ShutdownTestCPU) {
552   ShutdownTest<CPUThreadPoolExecutor, folly::FutureException>();
553 }
554
555 template <typename TPE>
556 static void removeThreadTest() {
557   // test that adding a .then() after we have removed some threads
558   // doesn't cause deadlock and they are executed on different threads
559   folly::Optional<folly::Future<int>> f;
560   std::thread::id id1, id2;
561   TPE fe(2);
562   f = folly::makeFuture()
563           .via(&fe)
564           .then([&id1]() {
565             burnMs(100)();
566             id1 = std::this_thread::get_id();
567           })
568           .then([&id2]() {
569             return 77;
570             id2 = std::this_thread::get_id();
571           });
572   fe.setNumThreads(1);
573
574   // future::then should be fulfilled because there is other thread available
575   EXPECT_EQ(77, f->get());
576   // two thread should be different because then part should be rescheduled to
577   // the other thread
578   EXPECT_NE(id1, id2);
579 }
580
581 TEST(ThreadPoolExecutorTest, RemoveThreadTestIO) {
582   removeThreadTest<IOThreadPoolExecutor>();
583 }
584
585 TEST(ThreadPoolExecutorTest, RemoveThreadTestCPU) {
586   removeThreadTest<CPUThreadPoolExecutor>();
587 }
588
589 template <typename TPE>
590 static void resizeThreadWhileExecutingTest() {
591   TPE tpe(10);
592   EXPECT_EQ(10, tpe.numThreads());
593
594   std::atomic<int> completed(0);
595   auto f = [&]() {
596     burnMs(10)();
597     completed++;
598   };
599   for (int i = 0; i < 1000; i++) {
600     tpe.add(f);
601   }
602   tpe.setNumThreads(8);
603   EXPECT_EQ(8, tpe.numThreads());
604   tpe.setNumThreads(5);
605   EXPECT_EQ(5, tpe.numThreads());
606   tpe.setNumThreads(15);
607   EXPECT_EQ(15, tpe.numThreads());
608   tpe.stop();
609   EXPECT_EQ(1000, completed);
610 }
611
612 TEST(ThreadPoolExecutorTest, resizeThreadWhileExecutingTestIO) {
613   resizeThreadWhileExecutingTest<IOThreadPoolExecutor>();
614 }
615
616 TEST(ThreadPoolExecutorTest, resizeThreadWhileExecutingTestCPU) {
617   resizeThreadWhileExecutingTest<CPUThreadPoolExecutor>();
618 }