2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <folly/executors/CPUThreadPoolExecutor.h>
21 #include <folly/executors/FutureExecutor.h>
22 #include <folly/executors/IOThreadPoolExecutor.h>
23 #include <folly/executors/ThreadPoolExecutor.h>
24 #include <folly/executors/task_queue/LifoSemMPMCQueue.h>
25 #include <folly/executors/task_queue/UnboundedBlockingQueue.h>
26 #include <folly/executors/thread_factory/PriorityThreadFactory.h>
27 #include <folly/portability/GTest.h>
29 using namespace folly;
30 using namespace std::chrono;
32 static Func burnMs(uint64_t ms) {
33 return [ms]() { std::this_thread::sleep_for(milliseconds(ms)); };
42 TEST(ThreadPoolExecutorTest, CPUBasic) {
43 basic<CPUThreadPoolExecutor>();
46 TEST(IOThreadPoolExecutorTest, IOBasic) {
47 basic<IOThreadPoolExecutor>();
51 static void resize() {
53 EXPECT_EQ(100, tpe.numThreads());
54 tpe.setNumThreads(50);
55 EXPECT_EQ(50, tpe.numThreads());
56 tpe.setNumThreads(150);
57 EXPECT_EQ(150, tpe.numThreads());
60 TEST(ThreadPoolExecutorTest, CPUResize) {
61 resize<CPUThreadPoolExecutor>();
64 TEST(ThreadPoolExecutorTest, IOResize) {
65 resize<IOThreadPoolExecutor>();
71 std::atomic<int> completed(0);
76 for (int i = 0; i < 1000; i++) {
80 EXPECT_GT(1000, completed);
83 // IOThreadPoolExecutor's stop() behaves like join(). Outstanding tasks belong
84 // to the event base, will be executed upon its destruction, and cannot be
87 void stop<IOThreadPoolExecutor>() {
88 IOThreadPoolExecutor tpe(1);
89 std::atomic<int> completed(0);
94 for (int i = 0; i < 10; i++) {
98 EXPECT_EQ(10, completed);
101 TEST(ThreadPoolExecutorTest, CPUStop) {
102 stop<CPUThreadPoolExecutor>();
105 TEST(ThreadPoolExecutorTest, IOStop) {
106 stop<IOThreadPoolExecutor>();
112 std::atomic<int> completed(0);
117 for (int i = 0; i < 1000; i++) {
121 EXPECT_EQ(1000, completed);
124 TEST(ThreadPoolExecutorTest, CPUJoin) {
125 join<CPUThreadPoolExecutor>();
128 TEST(ThreadPoolExecutorTest, IOJoin) {
129 join<IOThreadPoolExecutor>();
133 static void resizeUnderLoad() {
135 std::atomic<int> completed(0);
140 for (int i = 0; i < 1000; i++) {
143 tpe.setNumThreads(5);
144 tpe.setNumThreads(15);
146 EXPECT_EQ(1000, completed);
149 TEST(ThreadPoolExecutorTest, CPUResizeUnderLoad) {
150 resizeUnderLoad<CPUThreadPoolExecutor>();
153 TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) {
154 resizeUnderLoad<IOThreadPoolExecutor>();
158 static void poolStats() {
159 folly::Baton<> startBaton, endBaton;
161 auto stats = tpe.getPoolStats();
162 EXPECT_EQ(1, stats.threadCount);
163 EXPECT_EQ(1, stats.idleThreadCount);
164 EXPECT_EQ(0, stats.activeThreadCount);
165 EXPECT_EQ(0, stats.pendingTaskCount);
166 EXPECT_EQ(0, tpe.getPendingTaskCount());
167 EXPECT_EQ(0, stats.totalTaskCount);
174 stats = tpe.getPoolStats();
175 EXPECT_EQ(1, stats.threadCount);
176 EXPECT_EQ(0, stats.idleThreadCount);
177 EXPECT_EQ(1, stats.activeThreadCount);
178 EXPECT_EQ(1, stats.pendingTaskCount);
179 EXPECT_EQ(1, tpe.getPendingTaskCount());
180 EXPECT_EQ(2, stats.totalTaskCount);
184 TEST(ThreadPoolExecutorTest, CPUPoolStats) {
185 poolStats<CPUThreadPoolExecutor>();
188 TEST(ThreadPoolExecutorTest, IOPoolStats) {
189 poolStats<IOThreadPoolExecutor>();
193 static void taskStats() {
195 std::atomic<int> c(0);
196 tpe.subscribeToTaskStats([&](ThreadPoolExecutor::TaskStats stats) {
198 EXPECT_LT(milliseconds(0), stats.runTime);
200 EXPECT_LT(milliseconds(0), stats.waitTime);
209 TEST(ThreadPoolExecutorTest, CPUTaskStats) {
210 taskStats<CPUThreadPoolExecutor>();
213 TEST(ThreadPoolExecutorTest, IOTaskStats) {
214 taskStats<IOThreadPoolExecutor>();
218 static void expiration() {
220 std::atomic<int> statCbCount(0);
221 tpe.subscribeToTaskStats([&](ThreadPoolExecutor::TaskStats stats) {
222 int i = statCbCount++;
224 EXPECT_FALSE(stats.expired);
226 EXPECT_TRUE(stats.expired);
231 std::atomic<int> expireCbCount(0);
232 auto expireCb = [&]() { expireCbCount++; };
233 tpe.add(burnMs(10), seconds(60), expireCb);
234 tpe.add(burnMs(10), milliseconds(10), expireCb);
236 EXPECT_EQ(2, statCbCount);
237 EXPECT_EQ(1, expireCbCount);
240 TEST(ThreadPoolExecutorTest, CPUExpiration) {
241 expiration<CPUThreadPoolExecutor>();
244 TEST(ThreadPoolExecutorTest, IOExpiration) {
245 expiration<IOThreadPoolExecutor>();
248 template <typename TPE>
249 static void futureExecutor() {
250 FutureExecutor<TPE> fe(2);
251 std::atomic<int> c{0};
252 fe.addFuture([]() { return makeFuture<int>(42); }).then([&](Try<int>&& t) {
254 EXPECT_EQ(42, t.value());
256 fe.addFuture([]() { return 100; }).then([&](Try<int>&& t) {
258 EXPECT_EQ(100, t.value());
260 fe.addFuture([]() { return makeFuture(); }).then([&](Try<Unit>&& t) {
262 EXPECT_NO_THROW(t.value());
264 fe.addFuture([]() { return; }).then([&](Try<Unit>&& t) {
266 EXPECT_NO_THROW(t.value());
268 fe.addFuture([]() { throw std::runtime_error("oops"); })
269 .then([&](Try<Unit>&& t) {
271 EXPECT_THROW(t.value(), std::runtime_error);
273 // Test doing actual async work
274 folly::Baton<> baton;
276 auto p = std::make_shared<Promise<int>>();
277 std::thread t([p]() {
282 return p->getFuture();
284 .then([&](Try<int>&& t) {
285 EXPECT_EQ(42, t.value());
294 TEST(ThreadPoolExecutorTest, CPUFuturePool) {
295 futureExecutor<CPUThreadPoolExecutor>();
298 TEST(ThreadPoolExecutorTest, IOFuturePool) {
299 futureExecutor<IOThreadPoolExecutor>();
302 TEST(ThreadPoolExecutorTest, PriorityPreemptionTest) {
303 bool tookLopri = false;
306 EXPECT_FALSE(tookLopri);
313 CPUThreadPoolExecutor pool(0, 2);
314 for (int i = 0; i < 50; i++) {
315 pool.addWithPriority(lopri, Executor::LO_PRI);
317 for (int i = 0; i < 50; i++) {
318 pool.addWithPriority(hipri, Executor::HI_PRI);
320 pool.setNumThreads(1);
322 EXPECT_EQ(100, completed);
325 class TestObserver : public ThreadPoolExecutor::Observer {
327 void threadStarted(ThreadPoolExecutor::ThreadHandle*) override {
330 void threadStopped(ThreadPoolExecutor::ThreadHandle*) override {
333 void threadPreviouslyStarted(ThreadPoolExecutor::ThreadHandle*) override {
336 void threadNotYetStopped(ThreadPoolExecutor::ThreadHandle*) override {
340 ASSERT_EQ(threads_, 0);
344 std::atomic<int> threads_{0};
347 TEST(ThreadPoolExecutorTest, IOObserver) {
348 auto observer = std::make_shared<TestObserver>();
351 IOThreadPoolExecutor exe(10);
352 exe.addObserver(observer);
353 exe.setNumThreads(3);
354 exe.setNumThreads(0);
355 exe.setNumThreads(7);
356 exe.removeObserver(observer);
357 exe.setNumThreads(10);
360 observer->checkCalls();
363 TEST(ThreadPoolExecutorTest, CPUObserver) {
364 auto observer = std::make_shared<TestObserver>();
367 CPUThreadPoolExecutor exe(10);
368 exe.addObserver(observer);
369 exe.setNumThreads(3);
370 exe.setNumThreads(0);
371 exe.setNumThreads(7);
372 exe.removeObserver(observer);
373 exe.setNumThreads(10);
376 observer->checkCalls();
379 TEST(ThreadPoolExecutorTest, AddWithPriority) {
380 std::atomic_int c{0};
381 auto f = [&] { c++; };
383 // IO exe doesn't support priorities
384 IOThreadPoolExecutor ioExe(10);
385 EXPECT_THROW(ioExe.addWithPriority(f, 0), std::runtime_error);
387 CPUThreadPoolExecutor cpuExe(10, 3);
388 cpuExe.addWithPriority(f, -1);
389 cpuExe.addWithPriority(f, 0);
390 cpuExe.addWithPriority(f, 1);
391 cpuExe.addWithPriority(f, -2); // will add at the lowest priority
392 cpuExe.addWithPriority(f, 2); // will add at the highest priority
393 cpuExe.addWithPriority(f, Executor::LO_PRI);
394 cpuExe.addWithPriority(f, Executor::HI_PRI);
400 TEST(ThreadPoolExecutorTest, BlockingQueue) {
401 std::atomic_int c{0};
406 const int kQueueCapacity = 1;
407 const int kThreads = 1;
409 auto queue = std::make_unique<LifoSemMPMCQueue<
410 CPUThreadPoolExecutor::CPUTask,
411 QueueBehaviorIfFull::BLOCK>>(kQueueCapacity);
413 CPUThreadPoolExecutor cpuExe(
416 std::make_shared<NamedThreadFactory>("CPUThreadPool"));
418 // Add `f` five times. It sleeps for 1ms every time. Calling
419 // `cppExec.add()` is *almost* guaranteed to block because there's
420 // only 1 cpu worker thread.
421 for (int i = 0; i < 5; i++) {
422 EXPECT_NO_THROW(cpuExe.add(f));
429 TEST(PriorityThreadFactoryTest, ThreadPriority) {
430 PriorityThreadFactory factory(
431 std::make_shared<NamedThreadFactory>("stuff"), 1);
432 int actualPriority = -21;
433 factory.newThread([&]() { actualPriority = getpriority(PRIO_PROCESS, 0); })
435 EXPECT_EQ(1, actualPriority);
438 class TestData : public folly::RequestData {
440 explicit TestData(int data) : data_(data) {}
441 ~TestData() override {}
443 bool hasCallback() override {
450 TEST(ThreadPoolExecutorTest, RequestContext) {
451 CPUThreadPoolExecutor executor(1);
453 RequestContextScopeGuard rctx; // create new request context for this scope
454 EXPECT_EQ(nullptr, RequestContext::get()->getContextData("test"));
455 RequestContext::get()->setContextData("test", std::make_unique<TestData>(42));
456 auto data = RequestContext::get()->getContextData("test");
457 EXPECT_EQ(42, dynamic_cast<TestData*>(data)->data_);
460 auto data = RequestContext::get()->getContextData("test");
461 ASSERT_TRUE(data != nullptr);
462 EXPECT_EQ(42, dynamic_cast<TestData*>(data)->data_);
467 explicit SlowMover(bool slow = false) : slow(slow) {}
468 SlowMover(SlowMover&& other) noexcept {
469 *this = std::move(other);
471 SlowMover& operator=(SlowMover&& other) noexcept {
474 /* sleep override */ std::this_thread::sleep_for(milliseconds(50));
482 template <typename Q>
483 void bugD3527722_test() {
484 // Test that the queue does not get stuck if writes are completed in
485 // order opposite to how they are initiated.
487 std::atomic<int> turn{};
489 std::thread consumer1([&] {
493 std::thread consumer2([&] {
498 std::thread producer1([&] {
504 q.add(SlowMover(true));
506 std::thread producer2([&] {
511 q.add(SlowMover(false));
520 TEST(ThreadPoolExecutorTest, LifoSemMPMCQueueBugD3527722) {
521 bugD3527722_test<LifoSemMPMCQueue<SlowMover>>();
524 template <typename T>
525 struct UBQ : public UnboundedBlockingQueue<T> {
529 TEST(ThreadPoolExecutorTest, UnboundedBlockingQueueBugD3527722) {
530 bugD3527722_test<UBQ<SlowMover>>();
533 template <typename TPE, typename ERR_T>
534 static void ShutdownTest() {
535 // test that adding a .then() after we have
536 // started shutting down does not deadlock
537 folly::Optional<folly::Future<int>> f;
540 f = folly::makeFuture().via(&fe).then([]() { burnMs(100)(); }).then([]() {
544 EXPECT_THROW(f->get(), ERR_T);
547 TEST(ThreadPoolExecutorTest, ShutdownTestIO) {
548 ShutdownTest<IOThreadPoolExecutor, std::runtime_error>();
551 TEST(ThreadPoolExecutorTest, ShutdownTestCPU) {
552 ShutdownTest<CPUThreadPoolExecutor, folly::FutureException>();
555 template <typename TPE>
556 static void removeThreadTest() {
557 // test that adding a .then() after we have removed some threads
558 // doesn't cause deadlock and they are executed on different threads
559 folly::Optional<folly::Future<int>> f;
560 std::thread::id id1, id2;
562 f = folly::makeFuture()
566 id1 = std::this_thread::get_id();
570 id2 = std::this_thread::get_id();
574 // future::then should be fulfilled because there is other thread available
575 EXPECT_EQ(77, f->get());
576 // two thread should be different because then part should be rescheduled to
581 TEST(ThreadPoolExecutorTest, RemoveThreadTestIO) {
582 removeThreadTest<IOThreadPoolExecutor>();
585 TEST(ThreadPoolExecutorTest, RemoveThreadTestCPU) {
586 removeThreadTest<CPUThreadPoolExecutor>();
589 template <typename TPE>
590 static void resizeThreadWhileExecutingTest() {
592 EXPECT_EQ(10, tpe.numThreads());
594 std::atomic<int> completed(0);
599 for (int i = 0; i < 1000; i++) {
602 tpe.setNumThreads(8);
603 EXPECT_EQ(8, tpe.numThreads());
604 tpe.setNumThreads(5);
605 EXPECT_EQ(5, tpe.numThreads());
606 tpe.setNumThreads(15);
607 EXPECT_EQ(15, tpe.numThreads());
609 EXPECT_EQ(1000, completed);
612 TEST(ThreadPoolExecutorTest, resizeThreadWhileExecutingTestIO) {
613 resizeThreadWhileExecutingTest<IOThreadPoolExecutor>();
616 TEST(ThreadPoolExecutorTest, resizeThreadWhileExecutingTestCPU) {
617 resizeThreadWhileExecutingTest<CPUThreadPoolExecutor>();