2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h>
19 #include <folly/MoveWrapper.h>
20 #include <glog/logging.h>
21 #include <folly/io/async/EventBaseManager.h>
23 #include <folly/detail/MemoryIdler.h>
25 namespace folly { namespace wangle {
27 using folly::detail::MemoryIdler;
29 /* Class that will free jemalloc caches and madvise the stack away
30 * if the event loop is unused for some period of time
32 class MemoryIdlerTimeout
33 : public AsyncTimeout , public EventBase::LoopCallback {
35 explicit MemoryIdlerTimeout(EventBase* b) : AsyncTimeout(b), base_(b) {}
37 virtual void timeoutExpired() noexcept {
41 virtual void runLoopCallback() noexcept {
43 MemoryIdler::flushLocalMallocCaches();
44 MemoryIdler::unmapUnusedStack(MemoryIdler::kDefaultStackToRetain);
48 std::chrono::steady_clock::duration idleTimeout =
49 MemoryIdler::defaultIdleTimeout.load(
50 std::memory_order_acquire);
52 idleTimeout = MemoryIdler::getVariationTimeout(idleTimeout);
54 scheduleTimeout(std::chrono::duration_cast<std::chrono::milliseconds>(
55 idleTimeout).count());
58 // reschedule this callback for the next event loop.
59 base_->runBeforeLoop(this);
66 IOThreadPoolExecutor::IOThreadPoolExecutor(
68 std::shared_ptr<ThreadFactory> threadFactory)
69 : ThreadPoolExecutor(numThreads, std::move(threadFactory)),
71 addThreads(numThreads);
72 CHECK(threadList_.get().size() == numThreads);
75 IOThreadPoolExecutor::~IOThreadPoolExecutor() {
79 void IOThreadPoolExecutor::add(Func func) {
80 add(std::move(func), std::chrono::milliseconds(0));
83 void IOThreadPoolExecutor::add(
85 std::chrono::milliseconds expiration,
86 Func expireCallback) {
87 RWSpinLock::ReadHolder{&threadListLock_};
88 if (threadList_.get().empty()) {
89 throw std::runtime_error("No threads available");
91 auto ioThread = pickThread();
93 auto moveTask = folly::makeMoveWrapper(
94 Task(std::move(func), expiration, std::move(expireCallback)));
95 auto wrappedFunc = [ioThread, moveTask] () mutable {
96 runTask(ioThread, std::move(*moveTask));
97 ioThread->pendingTasks--;
100 ioThread->pendingTasks++;
101 if (!ioThread->eventBase->runInEventBaseThread(std::move(wrappedFunc))) {
102 ioThread->pendingTasks--;
103 throw std::runtime_error("Unable to run func in event base thread");
107 std::shared_ptr<IOThreadPoolExecutor::IOThread>
108 IOThreadPoolExecutor::pickThread() {
112 auto thread = threadList_.get()[nextThread_++ % threadList_.get().size()];
113 return std::static_pointer_cast<IOThread>(thread);
116 EventBase* IOThreadPoolExecutor::getEventBase() {
117 return pickThread()->eventBase;
120 std::shared_ptr<ThreadPoolExecutor::Thread>
121 IOThreadPoolExecutor::makeThread() {
122 return std::make_shared<IOThread>(this);
125 void IOThreadPoolExecutor::threadRun(ThreadPtr thread) {
126 const auto ioThread = std::static_pointer_cast<IOThread>(thread);
127 ioThread->eventBase =
128 folly::EventBaseManager::get()->getEventBase();
129 thisThread_.reset(new std::shared_ptr<IOThread>(ioThread));
131 auto idler = new MemoryIdlerTimeout(ioThread->eventBase);
132 ioThread->eventBase->runBeforeLoop(idler);
134 thread->startupBaton.post();
135 while (ioThread->shouldRun) {
136 ioThread->eventBase->loopForever();
139 while (ioThread->pendingTasks > 0) {
140 ioThread->eventBase->loopOnce();
143 stoppedThreads_.add(ioThread);
146 // threadListLock_ is writelocked
147 void IOThreadPoolExecutor::stopThreads(size_t n) {
148 for (size_t i = 0; i < n; i++) {
149 const auto ioThread = std::static_pointer_cast<IOThread>(
150 threadList_.get()[i]);
151 ioThread->shouldRun = false;
152 ioThread->eventBase->terminateLoopSoon();
156 std::vector<EventBase*> IOThreadPoolExecutor::getEventBases() {
157 std::vector<EventBase*> bases;
158 RWSpinLock::ReadHolder{&threadListLock_};
159 for (const auto& thread : threadList_.get()) {
160 auto ioThread = std::static_pointer_cast<IOThread>(thread);
161 bases.push_back(ioThread->eventBase);
166 // threadListLock_ is readlocked
167 uint64_t IOThreadPoolExecutor::getPendingTaskCount() {
169 for (const auto& thread : threadList_.get()) {
170 auto ioThread = std::static_pointer_cast<IOThread>(thread);
171 size_t pendingTasks = ioThread->pendingTasks;
172 if (pendingTasks > 0 && !ioThread->idle) {
175 count += pendingTasks;