move wangle/concurrent to folly/executors
[folly.git] / folly / executors / test / ThreadPoolExecutorTest.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <thread>
18
19 #include <folly/executors/CPUThreadPoolExecutor.h>
20 #include <folly/executors/FutureExecutor.h>
21 #include <folly/executors/IOThreadPoolExecutor.h>
22 #include <folly/executors/LifoSemMPMCQueue.h>
23 #include <folly/executors/PriorityThreadFactory.h>
24 #include <folly/executors/ThreadPoolExecutor.h>
25 #include <gtest/gtest.h>
26
27 using namespace folly;
28 using namespace std::chrono;
29
30 static Func burnMs(uint64_t ms) {
31   return [ms]() { std::this_thread::sleep_for(milliseconds(ms)); };
32 }
33
34 template <class TPE>
35 static void basic() {
36   // Create and destroy
37   TPE tpe(10);
38 }
39
40 TEST(ThreadPoolExecutorTest, CPUBasic) {
41   basic<CPUThreadPoolExecutor>();
42 }
43
44 TEST(IOThreadPoolExecutorTest, IOBasic) {
45   basic<IOThreadPoolExecutor>();
46 }
47
48 template <class TPE>
49 static void resize() {
50   TPE tpe(100);
51   EXPECT_EQ(100, tpe.numThreads());
52   tpe.setNumThreads(50);
53   EXPECT_EQ(50, tpe.numThreads());
54   tpe.setNumThreads(150);
55   EXPECT_EQ(150, tpe.numThreads());
56 }
57
58 TEST(ThreadPoolExecutorTest, CPUResize) {
59   resize<CPUThreadPoolExecutor>();
60 }
61
62 TEST(ThreadPoolExecutorTest, IOResize) {
63   resize<IOThreadPoolExecutor>();
64 }
65
66 template <class TPE>
67 static void stop() {
68   TPE tpe(1);
69   std::atomic<int> completed(0);
70   auto f = [&]() {
71     burnMs(10)();
72     completed++;
73   };
74   for (int i = 0; i < 1000; i++) {
75     tpe.add(f);
76   }
77   tpe.stop();
78   EXPECT_GT(1000, completed);
79 }
80
81 // IOThreadPoolExecutor's stop() behaves like join(). Outstanding tasks belong
82 // to the event base, will be executed upon its destruction, and cannot be
83 // taken back.
84 template <>
85 void stop<IOThreadPoolExecutor>() {
86   IOThreadPoolExecutor tpe(1);
87   std::atomic<int> completed(0);
88   auto f = [&]() {
89     burnMs(10)();
90     completed++;
91   };
92   for (int i = 0; i < 10; i++) {
93     tpe.add(f);
94   }
95   tpe.stop();
96   EXPECT_EQ(10, completed);
97 }
98
99 TEST(ThreadPoolExecutorTest, CPUStop) {
100   stop<CPUThreadPoolExecutor>();
101 }
102
103 TEST(ThreadPoolExecutorTest, IOStop) {
104   stop<IOThreadPoolExecutor>();
105 }
106
107 template <class TPE>
108 static void join() {
109   TPE tpe(10);
110   std::atomic<int> completed(0);
111   auto f = [&]() {
112     burnMs(1)();
113     completed++;
114   };
115   for (int i = 0; i < 1000; i++) {
116     tpe.add(f);
117   }
118   tpe.join();
119   EXPECT_EQ(1000, completed);
120 }
121
122 TEST(ThreadPoolExecutorTest, CPUJoin) {
123   join<CPUThreadPoolExecutor>();
124 }
125
126 TEST(ThreadPoolExecutorTest, IOJoin) {
127   join<IOThreadPoolExecutor>();
128 }
129
130 template <class TPE>
131 static void resizeUnderLoad() {
132   TPE tpe(10);
133   std::atomic<int> completed(0);
134   auto f = [&]() {
135     burnMs(1)();
136     completed++;
137   };
138   for (int i = 0; i < 1000; i++) {
139     tpe.add(f);
140   }
141   tpe.setNumThreads(5);
142   tpe.setNumThreads(15);
143   tpe.join();
144   EXPECT_EQ(1000, completed);
145 }
146
147 TEST(ThreadPoolExecutorTest, CPUResizeUnderLoad) {
148   resizeUnderLoad<CPUThreadPoolExecutor>();
149 }
150
151 TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) {
152   resizeUnderLoad<IOThreadPoolExecutor>();
153 }
154
155 template <class TPE>
156 static void poolStats() {
157   folly::Baton<> startBaton, endBaton;
158   TPE tpe(1);
159   auto stats = tpe.getPoolStats();
160   EXPECT_EQ(1, stats.threadCount);
161   EXPECT_EQ(1, stats.idleThreadCount);
162   EXPECT_EQ(0, stats.activeThreadCount);
163   EXPECT_EQ(0, stats.pendingTaskCount);
164   EXPECT_EQ(0, tpe.getPendingTaskCount());
165   EXPECT_EQ(0, stats.totalTaskCount);
166   tpe.add([&]() {
167     startBaton.post();
168     endBaton.wait();
169   });
170   tpe.add([&]() {});
171   startBaton.wait();
172   stats = tpe.getPoolStats();
173   EXPECT_EQ(1, stats.threadCount);
174   EXPECT_EQ(0, stats.idleThreadCount);
175   EXPECT_EQ(1, stats.activeThreadCount);
176   EXPECT_EQ(1, stats.pendingTaskCount);
177   EXPECT_EQ(1, tpe.getPendingTaskCount());
178   EXPECT_EQ(2, stats.totalTaskCount);
179   endBaton.post();
180 }
181
182 TEST(ThreadPoolExecutorTest, CPUPoolStats) {
183   poolStats<CPUThreadPoolExecutor>();
184 }
185
186 TEST(ThreadPoolExecutorTest, IOPoolStats) {
187   poolStats<IOThreadPoolExecutor>();
188 }
189
190 template <class TPE>
191 static void taskStats() {
192   TPE tpe(1);
193   std::atomic<int> c(0);
194   tpe.subscribeToTaskStats([&](ThreadPoolExecutor::TaskStats stats) {
195     int i = c++;
196     EXPECT_LT(milliseconds(0), stats.runTime);
197     if (i == 1) {
198       EXPECT_LT(milliseconds(0), stats.waitTime);
199     }
200   });
201   tpe.add(burnMs(10));
202   tpe.add(burnMs(10));
203   tpe.join();
204   EXPECT_EQ(2, c);
205 }
206
207 TEST(ThreadPoolExecutorTest, CPUTaskStats) {
208   taskStats<CPUThreadPoolExecutor>();
209 }
210
211 TEST(ThreadPoolExecutorTest, IOTaskStats) {
212   taskStats<IOThreadPoolExecutor>();
213 }
214
215 template <class TPE>
216 static void expiration() {
217   TPE tpe(1);
218   std::atomic<int> statCbCount(0);
219   tpe.subscribeToTaskStats([&](ThreadPoolExecutor::TaskStats stats) {
220     int i = statCbCount++;
221     if (i == 0) {
222       EXPECT_FALSE(stats.expired);
223     } else if (i == 1) {
224       EXPECT_TRUE(stats.expired);
225     } else {
226       FAIL();
227     }
228   });
229   std::atomic<int> expireCbCount(0);
230   auto expireCb = [&]() { expireCbCount++; };
231   tpe.add(burnMs(10), seconds(60), expireCb);
232   tpe.add(burnMs(10), milliseconds(10), expireCb);
233   tpe.join();
234   EXPECT_EQ(2, statCbCount);
235   EXPECT_EQ(1, expireCbCount);
236 }
237
238 TEST(ThreadPoolExecutorTest, CPUExpiration) {
239   expiration<CPUThreadPoolExecutor>();
240 }
241
242 TEST(ThreadPoolExecutorTest, IOExpiration) {
243   expiration<IOThreadPoolExecutor>();
244 }
245
246 template <typename TPE>
247 static void futureExecutor() {
248   FutureExecutor<TPE> fe(2);
249   std::atomic<int> c{0};
250   fe.addFuture([]() { return makeFuture<int>(42); }).then([&](Try<int>&& t) {
251     c++;
252     EXPECT_EQ(42, t.value());
253   });
254   fe.addFuture([]() { return 100; }).then([&](Try<int>&& t) {
255     c++;
256     EXPECT_EQ(100, t.value());
257   });
258   fe.addFuture([]() { return makeFuture(); }).then([&](Try<Unit>&& t) {
259     c++;
260     EXPECT_NO_THROW(t.value());
261   });
262   fe.addFuture([]() { return; }).then([&](Try<Unit>&& t) {
263     c++;
264     EXPECT_NO_THROW(t.value());
265   });
266   fe.addFuture([]() { throw std::runtime_error("oops"); })
267       .then([&](Try<Unit>&& t) {
268         c++;
269         EXPECT_THROW(t.value(), std::runtime_error);
270       });
271   // Test doing actual async work
272   folly::Baton<> baton;
273   fe.addFuture([&]() {
274       auto p = std::make_shared<Promise<int>>();
275       std::thread t([p]() {
276         burnMs(10)();
277         p->setValue(42);
278       });
279       t.detach();
280       return p->getFuture();
281     })
282       .then([&](Try<int>&& t) {
283         EXPECT_EQ(42, t.value());
284         c++;
285         baton.post();
286       });
287   baton.wait();
288   fe.join();
289   EXPECT_EQ(6, c);
290 }
291
292 TEST(ThreadPoolExecutorTest, CPUFuturePool) {
293   futureExecutor<CPUThreadPoolExecutor>();
294 }
295
296 TEST(ThreadPoolExecutorTest, IOFuturePool) {
297   futureExecutor<IOThreadPoolExecutor>();
298 }
299
300 TEST(ThreadPoolExecutorTest, PriorityPreemptionTest) {
301   bool tookLopri = false;
302   auto completed = 0;
303   auto hipri = [&] {
304     EXPECT_FALSE(tookLopri);
305     completed++;
306   };
307   auto lopri = [&] {
308     tookLopri = true;
309     completed++;
310   };
311   CPUThreadPoolExecutor pool(0, 2);
312   for (int i = 0; i < 50; i++) {
313     pool.addWithPriority(lopri, Executor::LO_PRI);
314   }
315   for (int i = 0; i < 50; i++) {
316     pool.addWithPriority(hipri, Executor::HI_PRI);
317   }
318   pool.setNumThreads(1);
319   pool.join();
320   EXPECT_EQ(100, completed);
321 }
322
323 class TestObserver : public ThreadPoolExecutor::Observer {
324  public:
325   void threadStarted(ThreadPoolExecutor::ThreadHandle*) override {
326     threads_++;
327   }
328   void threadStopped(ThreadPoolExecutor::ThreadHandle*) override {
329     threads_--;
330   }
331   void threadPreviouslyStarted(ThreadPoolExecutor::ThreadHandle*) override {
332     threads_++;
333   }
334   void threadNotYetStopped(ThreadPoolExecutor::ThreadHandle*) override {
335     threads_--;
336   }
337   void checkCalls() {
338     ASSERT_EQ(threads_, 0);
339   }
340
341  private:
342   std::atomic<int> threads_{0};
343 };
344
345 TEST(ThreadPoolExecutorTest, IOObserver) {
346   auto observer = std::make_shared<TestObserver>();
347
348   {
349     IOThreadPoolExecutor exe(10);
350     exe.addObserver(observer);
351     exe.setNumThreads(3);
352     exe.setNumThreads(0);
353     exe.setNumThreads(7);
354     exe.removeObserver(observer);
355     exe.setNumThreads(10);
356   }
357
358   observer->checkCalls();
359 }
360
361 TEST(ThreadPoolExecutorTest, CPUObserver) {
362   auto observer = std::make_shared<TestObserver>();
363
364   {
365     CPUThreadPoolExecutor exe(10);
366     exe.addObserver(observer);
367     exe.setNumThreads(3);
368     exe.setNumThreads(0);
369     exe.setNumThreads(7);
370     exe.removeObserver(observer);
371     exe.setNumThreads(10);
372   }
373
374   observer->checkCalls();
375 }
376
377 TEST(ThreadPoolExecutorTest, AddWithPriority) {
378   std::atomic_int c{0};
379   auto f = [&] { c++; };
380
381   // IO exe doesn't support priorities
382   IOThreadPoolExecutor ioExe(10);
383   EXPECT_THROW(ioExe.addWithPriority(f, 0), std::runtime_error);
384
385   CPUThreadPoolExecutor cpuExe(10, 3);
386   cpuExe.addWithPriority(f, -1);
387   cpuExe.addWithPriority(f, 0);
388   cpuExe.addWithPriority(f, 1);
389   cpuExe.addWithPriority(f, -2); // will add at the lowest priority
390   cpuExe.addWithPriority(f, 2); // will add at the highest priority
391   cpuExe.addWithPriority(f, Executor::LO_PRI);
392   cpuExe.addWithPriority(f, Executor::HI_PRI);
393   cpuExe.join();
394
395   EXPECT_EQ(7, c);
396 }
397
398 TEST(ThreadPoolExecutorTest, BlockingQueue) {
399   std::atomic_int c{0};
400   auto f = [&] {
401     burnMs(1)();
402     c++;
403   };
404   const int kQueueCapacity = 1;
405   const int kThreads = 1;
406
407   auto queue = std::make_unique<LifoSemMPMCQueue<
408       CPUThreadPoolExecutor::CPUTask,
409       QueueBehaviorIfFull::BLOCK>>(kQueueCapacity);
410
411   CPUThreadPoolExecutor cpuExe(
412       kThreads,
413       std::move(queue),
414       std::make_shared<NamedThreadFactory>("CPUThreadPool"));
415
416   // Add `f` five times. It sleeps for 1ms every time. Calling
417   // `cppExec.add()` is *almost* guaranteed to block because there's
418   // only 1 cpu worker thread.
419   for (int i = 0; i < 5; i++) {
420     EXPECT_NO_THROW(cpuExe.add(f));
421   }
422   cpuExe.join();
423
424   EXPECT_EQ(5, c);
425 }
426
427 TEST(PriorityThreadFactoryTest, ThreadPriority) {
428   PriorityThreadFactory factory(
429       std::make_shared<NamedThreadFactory>("stuff"), 1);
430   int actualPriority = -21;
431   factory.newThread([&]() { actualPriority = getpriority(PRIO_PROCESS, 0); })
432       .join();
433   EXPECT_EQ(1, actualPriority);
434 }
435
436 class TestData : public folly::RequestData {
437  public:
438   explicit TestData(int data) : data_(data) {}
439   ~TestData() override {}
440   int data_;
441 };
442
443 TEST(ThreadPoolExecutorTest, RequestContext) {
444   CPUThreadPoolExecutor executor(1);
445
446   RequestContextScopeGuard rctx; // create new request context for this scope
447   EXPECT_EQ(nullptr, RequestContext::get()->getContextData("test"));
448   RequestContext::get()->setContextData(
449       "test", std::unique_ptr<TestData>(new TestData(42)));
450   auto data = RequestContext::get()->getContextData("test");
451   EXPECT_EQ(42, dynamic_cast<TestData*>(data)->data_);
452
453   executor.add([] {
454     auto data = RequestContext::get()->getContextData("test");
455     ASSERT_TRUE(data != nullptr);
456     EXPECT_EQ(42, dynamic_cast<TestData*>(data)->data_);
457   });
458 }
459
460 struct SlowMover {
461   explicit SlowMover(bool slow = false) : slow(slow) {}
462   SlowMover(SlowMover&& other) noexcept {
463     *this = std::move(other);
464   }
465   SlowMover& operator=(SlowMover&& other) noexcept {
466     slow = other.slow;
467     if (slow) {
468       /* sleep override */ std::this_thread::sleep_for(milliseconds(50));
469     }
470     return *this;
471   }
472
473   bool slow;
474 };
475
476 TEST(ThreadPoolExecutorTest, BugD3527722) {
477   // Test that the queue does not get stuck if writes are completed in
478   // order opposite to how they are initiated.
479   LifoSemMPMCQueue<SlowMover> q(1024);
480   std::atomic<int> turn{};
481
482   std::thread consumer1([&] {
483     ++turn;
484     q.take();
485   });
486   std::thread consumer2([&] {
487     ++turn;
488     q.take();
489   });
490
491   std::thread producer1([&] {
492     ++turn;
493     while (turn < 4)
494       ;
495     ++turn;
496     q.add(SlowMover(true));
497   });
498   std::thread producer2([&] {
499     ++turn;
500     while (turn < 5)
501       ;
502     q.add(SlowMover(false));
503   });
504
505   producer1.join();
506   producer2.join();
507   consumer1.join();
508   consumer2.join();
509 }
510
511 template <typename TPE, typename ERR_T>
512 static void ShutdownTest() {
513   // test that adding a .then() after we have
514   // started shutting down does not deadlock
515   folly::Optional<folly::Future<int>> f;
516   {
517     TPE fe(1);
518     f = folly::makeFuture().via(&fe).then([]() { burnMs(100)(); }).then([]() {
519       return 77;
520     });
521   }
522   EXPECT_THROW(f->get(), ERR_T);
523 }
524
525 TEST(ThreadPoolExecutorTest, ShutdownTestIO) {
526   ShutdownTest<IOThreadPoolExecutor, std::runtime_error>();
527 }
528
529 TEST(ThreadPoolExecutorTest, ShutdownTestCPU) {
530   ShutdownTest<CPUThreadPoolExecutor, folly::FutureException>();
531 }
532
533 template <typename TPE>
534 static void removeThreadTest() {
535   // test that adding a .then() after we have removed some threads
536   // doesn't cause deadlock and they are executed on different threads
537   folly::Optional<folly::Future<int>> f;
538   std::thread::id id1, id2;
539   TPE fe(2);
540   f = folly::makeFuture()
541           .via(&fe)
542           .then([&id1]() {
543             burnMs(100)();
544             id1 = std::this_thread::get_id();
545           })
546           .then([&id2]() {
547             return 77;
548             id2 = std::this_thread::get_id();
549           });
550   fe.setNumThreads(1);
551
552   // future::then should be fulfilled because there is other thread available
553   EXPECT_EQ(77, f->get());
554   // two thread should be different because then part should be rescheduled to
555   // the other thread
556   EXPECT_NE(id1, id2);
557 }
558
559 TEST(ThreadPoolExecutorTest, RemoveThreadTestIO) {
560   removeThreadTest<IOThreadPoolExecutor>();
561 }
562
563 TEST(ThreadPoolExecutorTest, RemoveThreadTestCPU) {
564   removeThreadTest<CPUThreadPoolExecutor>();
565 }
566
567 template <typename TPE>
568 static void resizeThreadWhileExecutingTest() {
569   TPE tpe(10);
570   EXPECT_EQ(10, tpe.numThreads());
571
572   std::atomic<int> completed(0);
573   auto f = [&]() {
574     burnMs(10)();
575     completed++;
576   };
577   for (int i = 0; i < 1000; i++) {
578     tpe.add(f);
579   }
580   tpe.setNumThreads(8);
581   EXPECT_EQ(8, tpe.numThreads());
582   tpe.setNumThreads(5);
583   EXPECT_EQ(5, tpe.numThreads());
584   tpe.setNumThreads(15);
585   EXPECT_EQ(15, tpe.numThreads());
586   tpe.stop();
587   EXPECT_EQ(1000, completed);
588 }
589
590 TEST(ThreadPoolExecutorTest, resizeThreadWhileExecutingTestIO) {
591   resizeThreadWhileExecutingTest<IOThreadPoolExecutor>();
592 }
593
594 TEST(ThreadPoolExecutorTest, resizeThreadWhileExecutingTestCPU) {
595   resizeThreadWhileExecutingTest<CPUThreadPoolExecutor>();
596 }