2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/experimental/wangle/concurrent/FutureExecutor.h>
18 #include <folly/experimental/wangle/concurrent/ThreadPoolExecutor.h>
19 #include <folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h>
20 #include <folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h>
21 #include <glog/logging.h>
22 #include <gtest/gtest.h>
24 using namespace folly::wangle;
25 using namespace std::chrono;
27 static Func burnMs(uint64_t ms) {
28 return [ms]() { std::this_thread::sleep_for(milliseconds(ms)); };
37 TEST(ThreadPoolExecutorTest, CPUBasic) {
38 basic<CPUThreadPoolExecutor>();
41 TEST(IOThreadPoolExecutorTest, IOBasic) {
42 basic<IOThreadPoolExecutor>();
46 static void resize() {
48 EXPECT_EQ(100, tpe.numThreads());
49 tpe.setNumThreads(50);
50 EXPECT_EQ(50, tpe.numThreads());
51 tpe.setNumThreads(150);
52 EXPECT_EQ(150, tpe.numThreads());
55 TEST(ThreadPoolExecutorTest, CPUResize) {
56 resize<CPUThreadPoolExecutor>();
59 TEST(ThreadPoolExecutorTest, IOResize) {
60 resize<IOThreadPoolExecutor>();
66 std::atomic<int> completed(0);
71 for (int i = 0; i < 1000; i++) {
75 EXPECT_GT(1000, completed);
78 TEST(ThreadPoolExecutorTest, CPUStop) {
79 stop<CPUThreadPoolExecutor>();
82 TEST(ThreadPoolExecutorTest, IOStop) {
83 stop<IOThreadPoolExecutor>();
89 std::atomic<int> completed(0);
94 for (int i = 0; i < 1000; i++) {
98 EXPECT_EQ(1000, completed);
101 TEST(ThreadPoolExecutorTest, CPUJoin) {
102 join<CPUThreadPoolExecutor>();
105 TEST(ThreadPoolExecutorTest, IOJoin) {
106 join<IOThreadPoolExecutor>();
110 static void resizeUnderLoad() {
112 std::atomic<int> completed(0);
117 for (int i = 0; i < 1000; i++) {
120 tpe.setNumThreads(5);
121 tpe.setNumThreads(15);
123 EXPECT_EQ(1000, completed);
126 TEST(ThreadPoolExecutorTest, CPUResizeUnderLoad) {
127 resizeUnderLoad<CPUThreadPoolExecutor>();
130 TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) {
131 resizeUnderLoad<IOThreadPoolExecutor>();
135 static void poolStats() {
136 folly::Baton<> startBaton, endBaton;
138 auto stats = tpe.getPoolStats();
139 EXPECT_EQ(1, stats.threadCount);
140 EXPECT_EQ(1, stats.idleThreadCount);
141 EXPECT_EQ(0, stats.activeThreadCount);
142 EXPECT_EQ(0, stats.pendingTaskCount);
143 EXPECT_EQ(0, stats.totalTaskCount);
144 tpe.add([&](){ startBaton.post(); endBaton.wait(); });
147 stats = tpe.getPoolStats();
148 EXPECT_EQ(1, stats.threadCount);
149 EXPECT_EQ(0, stats.idleThreadCount);
150 EXPECT_EQ(1, stats.activeThreadCount);
151 EXPECT_EQ(1, stats.pendingTaskCount);
152 EXPECT_EQ(2, stats.totalTaskCount);
156 TEST(ThreadPoolExecutorTest, CPUPoolStats) {
157 poolStats<CPUThreadPoolExecutor>();
160 TEST(ThreadPoolExecutorTest, IOPoolStats) {
161 poolStats<IOThreadPoolExecutor>();
165 static void taskStats() {
167 std::atomic<int> c(0);
168 tpe.subscribeToTaskStats(Observer<ThreadPoolExecutor::TaskStats>::create(
169 [&] (ThreadPoolExecutor::TaskStats stats) {
171 EXPECT_LT(milliseconds(0), stats.runTime);
173 EXPECT_LT(milliseconds(0), stats.waitTime);
182 TEST(ThreadPoolExecutorTest, CPUTaskStats) {
183 taskStats<CPUThreadPoolExecutor>();
186 TEST(ThreadPoolExecutorTest, IOTaskStats) {
187 taskStats<IOThreadPoolExecutor>();
191 static void expiration() {
193 std::atomic<int> statCbCount(0);
194 tpe.subscribeToTaskStats(Observer<ThreadPoolExecutor::TaskStats>::create(
195 [&] (ThreadPoolExecutor::TaskStats stats) {
196 int i = statCbCount++;
198 EXPECT_FALSE(stats.expired);
200 EXPECT_TRUE(stats.expired);
205 std::atomic<int> expireCbCount(0);
206 auto expireCb = [&] () { expireCbCount++; };
207 tpe.add(burnMs(10), seconds(60), expireCb);
208 tpe.add(burnMs(10), milliseconds(10), expireCb);
210 EXPECT_EQ(2, statCbCount);
211 EXPECT_EQ(1, expireCbCount);
214 TEST(ThreadPoolExecutorTest, CPUExpiration) {
215 expiration<CPUThreadPoolExecutor>();
218 TEST(ThreadPoolExecutorTest, IOExpiration) {
219 expiration<IOThreadPoolExecutor>();
222 template <typename TPE>
223 static void futureExecutor() {
224 FutureExecutor<TPE> fe(2);
226 fe.addFuture([] () { return makeFuture<int>(42); }).then(
229 EXPECT_EQ(42, t.value());
231 fe.addFuture([] () { return 100; }).then(
234 EXPECT_EQ(100, t.value());
236 fe.addFuture([] () { return makeFuture(); }).then(
237 [&] (Try<void>&& t) {
239 EXPECT_NO_THROW(t.value());
241 fe.addFuture([] () { return; }).then(
242 [&] (Try<void>&& t) {
244 EXPECT_NO_THROW(t.value());
246 fe.addFuture([] () { throw std::runtime_error("oops"); }).then(
247 [&] (Try<void>&& t) {
249 EXPECT_THROW(t.value(), std::runtime_error);
251 // Test doing actual async work
252 folly::Baton<> baton;
253 fe.addFuture([&] () {
254 auto p = std::make_shared<Promise<int>>();
260 return p->getFuture();
261 }).then([&] (Try<int>&& t) {
262 EXPECT_EQ(42, t.value());
271 TEST(ThreadPoolExecutorTest, CPUFuturePool) {
272 futureExecutor<CPUThreadPoolExecutor>();
275 TEST(ThreadPoolExecutorTest, IOFuturePool) {
276 futureExecutor<IOThreadPoolExecutor>();