2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <folly/executors/CPUThreadPoolExecutor.h>
21 #include <folly/executors/FutureExecutor.h>
22 #include <folly/executors/IOThreadPoolExecutor.h>
23 #include <folly/executors/ThreadPoolExecutor.h>
24 #include <folly/executors/task_queue/LifoSemMPMCQueue.h>
25 #include <folly/executors/thread_factory/PriorityThreadFactory.h>
26 #include <folly/portability/GTest.h>
28 using namespace folly;
29 using namespace std::chrono;
31 static Func burnMs(uint64_t ms) {
32 return [ms]() { std::this_thread::sleep_for(milliseconds(ms)); };
41 TEST(ThreadPoolExecutorTest, CPUBasic) {
42 basic<CPUThreadPoolExecutor>();
45 TEST(IOThreadPoolExecutorTest, IOBasic) {
46 basic<IOThreadPoolExecutor>();
50 static void resize() {
52 EXPECT_EQ(100, tpe.numThreads());
53 tpe.setNumThreads(50);
54 EXPECT_EQ(50, tpe.numThreads());
55 tpe.setNumThreads(150);
56 EXPECT_EQ(150, tpe.numThreads());
59 TEST(ThreadPoolExecutorTest, CPUResize) {
60 resize<CPUThreadPoolExecutor>();
63 TEST(ThreadPoolExecutorTest, IOResize) {
64 resize<IOThreadPoolExecutor>();
70 std::atomic<int> completed(0);
75 for (int i = 0; i < 1000; i++) {
79 EXPECT_GT(1000, completed);
82 // IOThreadPoolExecutor's stop() behaves like join(). Outstanding tasks belong
83 // to the event base, will be executed upon its destruction, and cannot be
86 void stop<IOThreadPoolExecutor>() {
87 IOThreadPoolExecutor tpe(1);
88 std::atomic<int> completed(0);
93 for (int i = 0; i < 10; i++) {
97 EXPECT_EQ(10, completed);
100 TEST(ThreadPoolExecutorTest, CPUStop) {
101 stop<CPUThreadPoolExecutor>();
104 TEST(ThreadPoolExecutorTest, IOStop) {
105 stop<IOThreadPoolExecutor>();
111 std::atomic<int> completed(0);
116 for (int i = 0; i < 1000; i++) {
120 EXPECT_EQ(1000, completed);
123 TEST(ThreadPoolExecutorTest, CPUJoin) {
124 join<CPUThreadPoolExecutor>();
127 TEST(ThreadPoolExecutorTest, IOJoin) {
128 join<IOThreadPoolExecutor>();
132 static void resizeUnderLoad() {
134 std::atomic<int> completed(0);
139 for (int i = 0; i < 1000; i++) {
142 tpe.setNumThreads(5);
143 tpe.setNumThreads(15);
145 EXPECT_EQ(1000, completed);
148 TEST(ThreadPoolExecutorTest, CPUResizeUnderLoad) {
149 resizeUnderLoad<CPUThreadPoolExecutor>();
152 TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) {
153 resizeUnderLoad<IOThreadPoolExecutor>();
157 static void poolStats() {
158 folly::Baton<> startBaton, endBaton;
160 auto stats = tpe.getPoolStats();
161 EXPECT_EQ(1, stats.threadCount);
162 EXPECT_EQ(1, stats.idleThreadCount);
163 EXPECT_EQ(0, stats.activeThreadCount);
164 EXPECT_EQ(0, stats.pendingTaskCount);
165 EXPECT_EQ(0, tpe.getPendingTaskCount());
166 EXPECT_EQ(0, stats.totalTaskCount);
173 stats = tpe.getPoolStats();
174 EXPECT_EQ(1, stats.threadCount);
175 EXPECT_EQ(0, stats.idleThreadCount);
176 EXPECT_EQ(1, stats.activeThreadCount);
177 EXPECT_EQ(1, stats.pendingTaskCount);
178 EXPECT_EQ(1, tpe.getPendingTaskCount());
179 EXPECT_EQ(2, stats.totalTaskCount);
183 TEST(ThreadPoolExecutorTest, CPUPoolStats) {
184 poolStats<CPUThreadPoolExecutor>();
187 TEST(ThreadPoolExecutorTest, IOPoolStats) {
188 poolStats<IOThreadPoolExecutor>();
192 static void taskStats() {
194 std::atomic<int> c(0);
195 tpe.subscribeToTaskStats([&](ThreadPoolExecutor::TaskStats stats) {
197 EXPECT_LT(milliseconds(0), stats.runTime);
199 EXPECT_LT(milliseconds(0), stats.waitTime);
208 TEST(ThreadPoolExecutorTest, CPUTaskStats) {
209 taskStats<CPUThreadPoolExecutor>();
212 TEST(ThreadPoolExecutorTest, IOTaskStats) {
213 taskStats<IOThreadPoolExecutor>();
217 static void expiration() {
219 std::atomic<int> statCbCount(0);
220 tpe.subscribeToTaskStats([&](ThreadPoolExecutor::TaskStats stats) {
221 int i = statCbCount++;
223 EXPECT_FALSE(stats.expired);
225 EXPECT_TRUE(stats.expired);
230 std::atomic<int> expireCbCount(0);
231 auto expireCb = [&]() { expireCbCount++; };
232 tpe.add(burnMs(10), seconds(60), expireCb);
233 tpe.add(burnMs(10), milliseconds(10), expireCb);
235 EXPECT_EQ(2, statCbCount);
236 EXPECT_EQ(1, expireCbCount);
239 TEST(ThreadPoolExecutorTest, CPUExpiration) {
240 expiration<CPUThreadPoolExecutor>();
243 TEST(ThreadPoolExecutorTest, IOExpiration) {
244 expiration<IOThreadPoolExecutor>();
247 template <typename TPE>
248 static void futureExecutor() {
249 FutureExecutor<TPE> fe(2);
250 std::atomic<int> c{0};
251 fe.addFuture([]() { return makeFuture<int>(42); }).then([&](Try<int>&& t) {
253 EXPECT_EQ(42, t.value());
255 fe.addFuture([]() { return 100; }).then([&](Try<int>&& t) {
257 EXPECT_EQ(100, t.value());
259 fe.addFuture([]() { return makeFuture(); }).then([&](Try<Unit>&& t) {
261 EXPECT_NO_THROW(t.value());
263 fe.addFuture([]() { return; }).then([&](Try<Unit>&& t) {
265 EXPECT_NO_THROW(t.value());
267 fe.addFuture([]() { throw std::runtime_error("oops"); })
268 .then([&](Try<Unit>&& t) {
270 EXPECT_THROW(t.value(), std::runtime_error);
272 // Test doing actual async work
273 folly::Baton<> baton;
275 auto p = std::make_shared<Promise<int>>();
276 std::thread t([p]() {
281 return p->getFuture();
283 .then([&](Try<int>&& t) {
284 EXPECT_EQ(42, t.value());
293 TEST(ThreadPoolExecutorTest, CPUFuturePool) {
294 futureExecutor<CPUThreadPoolExecutor>();
297 TEST(ThreadPoolExecutorTest, IOFuturePool) {
298 futureExecutor<IOThreadPoolExecutor>();
301 TEST(ThreadPoolExecutorTest, PriorityPreemptionTest) {
302 bool tookLopri = false;
305 EXPECT_FALSE(tookLopri);
312 CPUThreadPoolExecutor pool(0, 2);
313 for (int i = 0; i < 50; i++) {
314 pool.addWithPriority(lopri, Executor::LO_PRI);
316 for (int i = 0; i < 50; i++) {
317 pool.addWithPriority(hipri, Executor::HI_PRI);
319 pool.setNumThreads(1);
321 EXPECT_EQ(100, completed);
324 class TestObserver : public ThreadPoolExecutor::Observer {
326 void threadStarted(ThreadPoolExecutor::ThreadHandle*) override {
329 void threadStopped(ThreadPoolExecutor::ThreadHandle*) override {
332 void threadPreviouslyStarted(ThreadPoolExecutor::ThreadHandle*) override {
335 void threadNotYetStopped(ThreadPoolExecutor::ThreadHandle*) override {
339 ASSERT_EQ(threads_, 0);
343 std::atomic<int> threads_{0};
346 TEST(ThreadPoolExecutorTest, IOObserver) {
347 auto observer = std::make_shared<TestObserver>();
350 IOThreadPoolExecutor exe(10);
351 exe.addObserver(observer);
352 exe.setNumThreads(3);
353 exe.setNumThreads(0);
354 exe.setNumThreads(7);
355 exe.removeObserver(observer);
356 exe.setNumThreads(10);
359 observer->checkCalls();
362 TEST(ThreadPoolExecutorTest, CPUObserver) {
363 auto observer = std::make_shared<TestObserver>();
366 CPUThreadPoolExecutor exe(10);
367 exe.addObserver(observer);
368 exe.setNumThreads(3);
369 exe.setNumThreads(0);
370 exe.setNumThreads(7);
371 exe.removeObserver(observer);
372 exe.setNumThreads(10);
375 observer->checkCalls();
378 TEST(ThreadPoolExecutorTest, AddWithPriority) {
379 std::atomic_int c{0};
380 auto f = [&] { c++; };
382 // IO exe doesn't support priorities
383 IOThreadPoolExecutor ioExe(10);
384 EXPECT_THROW(ioExe.addWithPriority(f, 0), std::runtime_error);
386 CPUThreadPoolExecutor cpuExe(10, 3);
387 cpuExe.addWithPriority(f, -1);
388 cpuExe.addWithPriority(f, 0);
389 cpuExe.addWithPriority(f, 1);
390 cpuExe.addWithPriority(f, -2); // will add at the lowest priority
391 cpuExe.addWithPriority(f, 2); // will add at the highest priority
392 cpuExe.addWithPriority(f, Executor::LO_PRI);
393 cpuExe.addWithPriority(f, Executor::HI_PRI);
399 TEST(ThreadPoolExecutorTest, BlockingQueue) {
400 std::atomic_int c{0};
405 const int kQueueCapacity = 1;
406 const int kThreads = 1;
408 auto queue = std::make_unique<LifoSemMPMCQueue<
409 CPUThreadPoolExecutor::CPUTask,
410 QueueBehaviorIfFull::BLOCK>>(kQueueCapacity);
412 CPUThreadPoolExecutor cpuExe(
415 std::make_shared<NamedThreadFactory>("CPUThreadPool"));
417 // Add `f` five times. It sleeps for 1ms every time. Calling
418 // `cppExec.add()` is *almost* guaranteed to block because there's
419 // only 1 cpu worker thread.
420 for (int i = 0; i < 5; i++) {
421 EXPECT_NO_THROW(cpuExe.add(f));
428 TEST(PriorityThreadFactoryTest, ThreadPriority) {
429 PriorityThreadFactory factory(
430 std::make_shared<NamedThreadFactory>("stuff"), 1);
431 int actualPriority = -21;
432 factory.newThread([&]() { actualPriority = getpriority(PRIO_PROCESS, 0); })
434 EXPECT_EQ(1, actualPriority);
437 class TestData : public folly::RequestData {
439 explicit TestData(int data) : data_(data) {}
440 ~TestData() override {}
442 bool hasCallback() override {
449 TEST(ThreadPoolExecutorTest, RequestContext) {
450 CPUThreadPoolExecutor executor(1);
452 RequestContextScopeGuard rctx; // create new request context for this scope
453 EXPECT_EQ(nullptr, RequestContext::get()->getContextData("test"));
454 RequestContext::get()->setContextData("test", std::make_unique<TestData>(42));
455 auto data = RequestContext::get()->getContextData("test");
456 EXPECT_EQ(42, dynamic_cast<TestData*>(data)->data_);
459 auto data = RequestContext::get()->getContextData("test");
460 ASSERT_TRUE(data != nullptr);
461 EXPECT_EQ(42, dynamic_cast<TestData*>(data)->data_);
466 explicit SlowMover(bool slow = false) : slow(slow) {}
467 SlowMover(SlowMover&& other) noexcept {
468 *this = std::move(other);
470 SlowMover& operator=(SlowMover&& other) noexcept {
473 /* sleep override */ std::this_thread::sleep_for(milliseconds(50));
481 TEST(ThreadPoolExecutorTest, BugD3527722) {
482 // Test that the queue does not get stuck if writes are completed in
483 // order opposite to how they are initiated.
484 LifoSemMPMCQueue<SlowMover> q(1024);
485 std::atomic<int> turn{};
487 std::thread consumer1([&] {
491 std::thread consumer2([&] {
496 std::thread producer1([&] {
502 q.add(SlowMover(true));
504 std::thread producer2([&] {
509 q.add(SlowMover(false));
518 template <typename TPE, typename ERR_T>
519 static void ShutdownTest() {
520 // test that adding a .then() after we have
521 // started shutting down does not deadlock
522 folly::Optional<folly::Future<int>> f;
525 f = folly::makeFuture().via(&fe).then([]() { burnMs(100)(); }).then([]() {
529 EXPECT_THROW(f->get(), ERR_T);
532 TEST(ThreadPoolExecutorTest, ShutdownTestIO) {
533 ShutdownTest<IOThreadPoolExecutor, std::runtime_error>();
536 TEST(ThreadPoolExecutorTest, ShutdownTestCPU) {
537 ShutdownTest<CPUThreadPoolExecutor, folly::FutureException>();
540 template <typename TPE>
541 static void removeThreadTest() {
542 // test that adding a .then() after we have removed some threads
543 // doesn't cause deadlock and they are executed on different threads
544 folly::Optional<folly::Future<int>> f;
545 std::thread::id id1, id2;
547 f = folly::makeFuture()
551 id1 = std::this_thread::get_id();
555 id2 = std::this_thread::get_id();
559 // future::then should be fulfilled because there is other thread available
560 EXPECT_EQ(77, f->get());
561 // two thread should be different because then part should be rescheduled to
566 TEST(ThreadPoolExecutorTest, RemoveThreadTestIO) {
567 removeThreadTest<IOThreadPoolExecutor>();
570 TEST(ThreadPoolExecutorTest, RemoveThreadTestCPU) {
571 removeThreadTest<CPUThreadPoolExecutor>();
574 template <typename TPE>
575 static void resizeThreadWhileExecutingTest() {
577 EXPECT_EQ(10, tpe.numThreads());
579 std::atomic<int> completed(0);
584 for (int i = 0; i < 1000; i++) {
587 tpe.setNumThreads(8);
588 EXPECT_EQ(8, tpe.numThreads());
589 tpe.setNumThreads(5);
590 EXPECT_EQ(5, tpe.numThreads());
591 tpe.setNumThreads(15);
592 EXPECT_EQ(15, tpe.numThreads());
594 EXPECT_EQ(1000, completed);
597 TEST(ThreadPoolExecutorTest, resizeThreadWhileExecutingTestIO) {
598 resizeThreadWhileExecutingTest<IOThreadPoolExecutor>();
601 TEST(ThreadPoolExecutorTest, resizeThreadWhileExecutingTestCPU) {
602 resizeThreadWhileExecutingTest<CPUThreadPoolExecutor>();