2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/experimental/wangle/concurrent/ThreadPoolExecutor.h>
18 #include <folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h>
19 #include <folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h>
20 #include <glog/logging.h>
21 #include <gtest/gtest.h>
23 using namespace folly::wangle;
24 using namespace std::chrono;
26 static Func burnMs(uint64_t ms) {
27 return [ms]() { std::this_thread::sleep_for(milliseconds(ms)); };
36 TEST(ThreadPoolExecutorTest, CPUBasic) {
37 basic<CPUThreadPoolExecutor>();
40 TEST(IOThreadPoolExecutorTest, IOBasic) {
41 basic<IOThreadPoolExecutor>();
45 static void resize() {
47 EXPECT_EQ(100, tpe.numThreads());
48 tpe.setNumThreads(50);
49 EXPECT_EQ(50, tpe.numThreads());
50 tpe.setNumThreads(150);
51 EXPECT_EQ(150, tpe.numThreads());
54 TEST(ThreadPoolExecutorTest, CPUResize) {
55 resize<CPUThreadPoolExecutor>();
58 TEST(ThreadPoolExecutorTest, IOResize) {
59 resize<IOThreadPoolExecutor>();
65 std::atomic<int> completed(0);
70 for (int i = 0; i < 1000; i++) {
74 EXPECT_GT(1000, completed);
77 TEST(ThreadPoolExecutorTest, CPUStop) {
78 stop<CPUThreadPoolExecutor>();
81 TEST(ThreadPoolExecutorTest, IOStop) {
82 stop<IOThreadPoolExecutor>();
88 std::atomic<int> completed(0);
93 for (int i = 0; i < 1000; i++) {
97 EXPECT_EQ(1000, completed);
100 TEST(ThreadPoolExecutorTest, CPUJoin) {
101 join<CPUThreadPoolExecutor>();
104 TEST(ThreadPoolExecutorTest, IOJoin) {
105 join<IOThreadPoolExecutor>();
109 static void resizeUnderLoad() {
111 std::atomic<int> completed(0);
116 for (int i = 0; i < 1000; i++) {
119 tpe.setNumThreads(5);
120 tpe.setNumThreads(15);
122 EXPECT_EQ(1000, completed);
125 TEST(ThreadPoolExecutorTest, CPUResizeUnderLoad) {
126 resizeUnderLoad<CPUThreadPoolExecutor>();
129 TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) {
130 resizeUnderLoad<IOThreadPoolExecutor>();
134 static void poolStats() {
135 folly::Baton<> startBaton, endBaton;
137 auto stats = tpe.getPoolStats();
138 EXPECT_EQ(1, stats.threadCount);
139 EXPECT_EQ(1, stats.idleThreadCount);
140 EXPECT_EQ(0, stats.activeThreadCount);
141 EXPECT_EQ(0, stats.pendingTaskCount);
142 EXPECT_EQ(0, stats.totalTaskCount);
143 tpe.add([&](){ startBaton.post(); endBaton.wait(); });
146 stats = tpe.getPoolStats();
147 EXPECT_EQ(1, stats.threadCount);
148 EXPECT_EQ(0, stats.idleThreadCount);
149 EXPECT_EQ(1, stats.activeThreadCount);
150 EXPECT_EQ(1, stats.pendingTaskCount);
151 EXPECT_EQ(2, stats.totalTaskCount);
155 TEST(ThreadPoolExecutorTest, CPUPoolStats) {
156 poolStats<CPUThreadPoolExecutor>();
159 TEST(ThreadPoolExecutorTest, IOPoolStats) {
160 poolStats<IOThreadPoolExecutor>();
164 static void taskStats() {
166 std::atomic<int> c(0);
167 tpe.subscribeToTaskStats(Observer<ThreadPoolExecutor::TaskStats>::create(
168 [&] (ThreadPoolExecutor::TaskStats stats) {
170 EXPECT_LT(milliseconds(0), stats.runTime);
172 EXPECT_LT(milliseconds(0), stats.waitTime);
181 TEST(ThreadPoolExecutorTest, CPUTaskStats) {
182 taskStats<CPUThreadPoolExecutor>();
185 TEST(ThreadPoolExecutorTest, IOTaskStats) {
186 taskStats<IOThreadPoolExecutor>();
190 static void expiration() {
192 std::atomic<int> statCbCount(0);
193 tpe.subscribeToTaskStats(Observer<ThreadPoolExecutor::TaskStats>::create(
194 [&] (ThreadPoolExecutor::TaskStats stats) {
195 int i = statCbCount++;
197 EXPECT_FALSE(stats.expired);
199 EXPECT_TRUE(stats.expired);
204 std::atomic<int> expireCbCount(0);
205 auto expireCb = [&] () { expireCbCount++; };
206 tpe.add(burnMs(10), seconds(60), expireCb);
207 tpe.add(burnMs(10), milliseconds(10), expireCb);
209 EXPECT_EQ(2, statCbCount);
210 EXPECT_EQ(1, expireCbCount);
213 TEST(ThreadPoolExecutorTest, CPUExpiration) {
214 expiration<CPUThreadPoolExecutor>();
217 TEST(ThreadPoolExecutorTest, IOExpiration) {
218 expiration<IOThreadPoolExecutor>();