2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/experimental/wangle/concurrent/FutureExecutor.h>
18 #include <folly/experimental/wangle/concurrent/ThreadPoolExecutor.h>
19 #include <folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h>
20 #include <folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h>
21 #include <glog/logging.h>
22 #include <gtest/gtest.h>
24 using namespace folly::wangle;
25 using namespace std::chrono;
27 static Func burnMs(uint64_t ms) {
28 return [ms]() { std::this_thread::sleep_for(milliseconds(ms)); };
37 TEST(ThreadPoolExecutorTest, CPUBasic) {
38 basic<CPUThreadPoolExecutor>();
41 TEST(IOThreadPoolExecutorTest, IOBasic) {
42 basic<IOThreadPoolExecutor>();
46 static void resize() {
48 EXPECT_EQ(100, tpe.numThreads());
49 tpe.setNumThreads(50);
50 EXPECT_EQ(50, tpe.numThreads());
51 tpe.setNumThreads(150);
52 EXPECT_EQ(150, tpe.numThreads());
55 TEST(ThreadPoolExecutorTest, CPUResize) {
56 resize<CPUThreadPoolExecutor>();
59 TEST(ThreadPoolExecutorTest, IOResize) {
60 resize<IOThreadPoolExecutor>();
66 std::atomic<int> completed(0);
71 for (int i = 0; i < 1000; i++) {
75 EXPECT_GT(1000, completed);
78 TEST(ThreadPoolExecutorTest, CPUStop) {
79 stop<CPUThreadPoolExecutor>();
82 TEST(ThreadPoolExecutorTest, IOStop) {
83 stop<IOThreadPoolExecutor>();
89 std::atomic<int> completed(0);
94 for (int i = 0; i < 1000; i++) {
98 EXPECT_EQ(1000, completed);
101 TEST(ThreadPoolExecutorTest, CPUJoin) {
102 join<CPUThreadPoolExecutor>();
105 TEST(ThreadPoolExecutorTest, IOJoin) {
106 join<IOThreadPoolExecutor>();
110 static void resizeUnderLoad() {
112 std::atomic<int> completed(0);
117 for (int i = 0; i < 1000; i++) {
120 tpe.setNumThreads(5);
121 tpe.setNumThreads(15);
123 EXPECT_EQ(1000, completed);
126 TEST(ThreadPoolExecutorTest, CPUResizeUnderLoad) {
127 resizeUnderLoad<CPUThreadPoolExecutor>();
130 TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) {
131 resizeUnderLoad<IOThreadPoolExecutor>();
135 static void poolStats() {
136 folly::Baton<> startBaton, endBaton;
138 auto stats = tpe.getPoolStats();
139 EXPECT_EQ(1, stats.threadCount);
140 EXPECT_EQ(1, stats.idleThreadCount);
141 EXPECT_EQ(0, stats.activeThreadCount);
142 EXPECT_EQ(0, stats.pendingTaskCount);
143 EXPECT_EQ(0, stats.totalTaskCount);
144 tpe.add([&](){ startBaton.post(); endBaton.wait(); });
147 stats = tpe.getPoolStats();
148 EXPECT_EQ(1, stats.threadCount);
149 EXPECT_EQ(0, stats.idleThreadCount);
150 EXPECT_EQ(1, stats.activeThreadCount);
151 EXPECT_EQ(1, stats.pendingTaskCount);
152 EXPECT_EQ(2, stats.totalTaskCount);
156 TEST(ThreadPoolExecutorTest, CPUPoolStats) {
157 poolStats<CPUThreadPoolExecutor>();
160 TEST(ThreadPoolExecutorTest, IOPoolStats) {
161 poolStats<IOThreadPoolExecutor>();
165 static void taskStats() {
167 std::atomic<int> c(0);
168 auto s = tpe.subscribeToTaskStats(
169 Observer<ThreadPoolExecutor::TaskStats>::create(
170 [&](ThreadPoolExecutor::TaskStats stats) {
172 EXPECT_LT(milliseconds(0), stats.runTime);
174 EXPECT_LT(milliseconds(0), stats.waitTime);
183 TEST(ThreadPoolExecutorTest, CPUTaskStats) {
184 taskStats<CPUThreadPoolExecutor>();
187 TEST(ThreadPoolExecutorTest, IOTaskStats) {
188 taskStats<IOThreadPoolExecutor>();
192 static void expiration() {
194 std::atomic<int> statCbCount(0);
195 auto s = tpe.subscribeToTaskStats(
196 Observer<ThreadPoolExecutor::TaskStats>::create(
197 [&](ThreadPoolExecutor::TaskStats stats) {
198 int i = statCbCount++;
200 EXPECT_FALSE(stats.expired);
202 EXPECT_TRUE(stats.expired);
207 std::atomic<int> expireCbCount(0);
208 auto expireCb = [&] () { expireCbCount++; };
209 tpe.add(burnMs(10), seconds(60), expireCb);
210 tpe.add(burnMs(10), milliseconds(10), expireCb);
212 EXPECT_EQ(2, statCbCount);
213 EXPECT_EQ(1, expireCbCount);
216 TEST(ThreadPoolExecutorTest, CPUExpiration) {
217 expiration<CPUThreadPoolExecutor>();
220 TEST(ThreadPoolExecutorTest, IOExpiration) {
221 expiration<IOThreadPoolExecutor>();
224 template <typename TPE>
225 static void futureExecutor() {
226 FutureExecutor<TPE> fe(2);
228 fe.addFuture([] () { return makeFuture<int>(42); }).then(
231 EXPECT_EQ(42, t.value());
233 fe.addFuture([] () { return 100; }).then(
236 EXPECT_EQ(100, t.value());
238 fe.addFuture([] () { return makeFuture(); }).then(
239 [&] (Try<void>&& t) {
241 EXPECT_NO_THROW(t.value());
243 fe.addFuture([] () { return; }).then(
244 [&] (Try<void>&& t) {
246 EXPECT_NO_THROW(t.value());
248 fe.addFuture([] () { throw std::runtime_error("oops"); }).then(
249 [&] (Try<void>&& t) {
251 EXPECT_THROW(t.value(), std::runtime_error);
253 // Test doing actual async work
254 folly::Baton<> baton;
255 fe.addFuture([&] () {
256 auto p = std::make_shared<Promise<int>>();
262 return p->getFuture();
263 }).then([&] (Try<int>&& t) {
264 EXPECT_EQ(42, t.value());
273 TEST(ThreadPoolExecutorTest, CPUFuturePool) {
274 futureExecutor<CPUThreadPoolExecutor>();
277 TEST(ThreadPoolExecutorTest, IOFuturePool) {
278 futureExecutor<IOThreadPoolExecutor>();