2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h>
19 #include <folly/MoveWrapper.h>
20 #include <glog/logging.h>
22 namespace folly { namespace wangle {
24 IOThreadPoolExecutor::IOThreadPoolExecutor(
26 std::unique_ptr<ThreadFactory> threadFactory)
27 : ThreadPoolExecutor(numThreads, std::move(threadFactory)),
29 addThreads(numThreads);
30 CHECK(threadList_.get().size() == numThreads);
33 IOThreadPoolExecutor::~IOThreadPoolExecutor() {
37 void IOThreadPoolExecutor::add(Func func) {
38 add(std::move(func), std::chrono::milliseconds(0));
41 void IOThreadPoolExecutor::add(
43 std::chrono::milliseconds expiration,
44 Func expireCallback) {
45 RWSpinLock::ReadHolder{&threadListLock_};
46 if (threadList_.get().empty()) {
47 throw std::runtime_error("No threads available");
49 auto thread = threadList_.get()[nextThread_++ % threadList_.get().size()];
50 auto ioThread = std::static_pointer_cast<IOThread>(thread);
52 auto moveTask = folly::makeMoveWrapper(
53 Task(std::move(func), expiration, std::move(expireCallback)));
54 auto wrappedFunc = [this, ioThread, moveTask] () mutable {
55 runTask(ioThread, std::move(*moveTask));
56 ioThread->pendingTasks--;
59 ioThread->pendingTasks++;
60 if (!ioThread->eventBase.runInEventBaseThread(std::move(wrappedFunc))) {
61 ioThread->pendingTasks--;
62 throw std::runtime_error("Unable to run func in event base thread");
66 std::shared_ptr<ThreadPoolExecutor::Thread>
67 IOThreadPoolExecutor::makeThread() {
68 return std::make_shared<IOThread>();
71 void IOThreadPoolExecutor::threadRun(ThreadPtr thread) {
72 const auto ioThread = std::static_pointer_cast<IOThread>(thread);
73 while (ioThread->shouldRun) {
74 ioThread->eventBase.loopForever();
77 while (ioThread->pendingTasks > 0) {
78 ioThread->eventBase.loopOnce();
81 stoppedThreads_.add(ioThread);
84 // threadListLock_ is writelocked
85 void IOThreadPoolExecutor::stopThreads(size_t n) {
86 for (int i = 0; i < n; i++) {
87 const auto ioThread = std::static_pointer_cast<IOThread>(
88 threadList_.get()[i]);
89 ioThread->shouldRun = false;
90 ioThread->eventBase.terminateLoopSoon();
94 // threadListLock_ is readlocked
95 uint64_t IOThreadPoolExecutor::getPendingTaskCount() {
97 for (const auto& thread : threadList_.get()) {
98 auto ioThread = std::static_pointer_cast<IOThread>(thread);
99 size_t pendingTasks = ioThread->pendingTasks;
100 if (pendingTasks > 0 && !ioThread->idle) {
103 count += pendingTasks;