2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/executors/IOThreadPoolExecutor.h>
19 #include <glog/logging.h>
21 #include <folly/detail/MemoryIdler.h>
25 using folly::detail::MemoryIdler;
27 /* Class that will free jemalloc caches and madvise the stack away
28 * if the event loop is unused for some period of time
30 class MemoryIdlerTimeout : public AsyncTimeout, public EventBase::LoopCallback {
32 explicit MemoryIdlerTimeout(EventBase* b) : AsyncTimeout(b), base_(b) {}
34 void timeoutExpired() noexcept override {
38 void runLoopCallback() noexcept override {
40 MemoryIdler::flushLocalMallocCaches();
41 MemoryIdler::unmapUnusedStack(MemoryIdler::kDefaultStackToRetain);
45 std::chrono::steady_clock::duration idleTimeout =
46 MemoryIdler::defaultIdleTimeout.load(std::memory_order_acquire);
48 idleTimeout = MemoryIdler::getVariationTimeout(idleTimeout);
50 scheduleTimeout(static_cast<uint32_t>(
51 std::chrono::duration_cast<std::chrono::milliseconds>(idleTimeout)
55 // reschedule this callback for the next event loop.
56 base_->runBeforeLoop(this);
64 IOThreadPoolExecutor::IOThreadPoolExecutor(
66 std::shared_ptr<ThreadFactory> threadFactory,
67 EventBaseManager* ebm,
69 : ThreadPoolExecutor(numThreads, std::move(threadFactory), waitForAll),
71 eventBaseManager_(ebm) {
72 setNumThreads(numThreads);
75 IOThreadPoolExecutor::~IOThreadPoolExecutor() {
79 void IOThreadPoolExecutor::add(Func func) {
80 add(std::move(func), std::chrono::milliseconds(0));
83 void IOThreadPoolExecutor::add(
85 std::chrono::milliseconds expiration,
86 Func expireCallback) {
87 RWSpinLock::ReadHolder r{&threadListLock_};
88 if (threadList_.get().empty()) {
89 throw std::runtime_error("No threads available");
91 auto ioThread = pickThread();
93 auto task = Task(std::move(func), expiration, std::move(expireCallback));
94 auto wrappedFunc = [ ioThread, task = std::move(task) ]() mutable {
95 runTask(ioThread, std::move(task));
96 ioThread->pendingTasks--;
99 ioThread->pendingTasks++;
100 if (!ioThread->eventBase->runInEventBaseThread(std::move(wrappedFunc))) {
101 ioThread->pendingTasks--;
102 throw std::runtime_error("Unable to run func in event base thread");
106 std::shared_ptr<IOThreadPoolExecutor::IOThread>
107 IOThreadPoolExecutor::pickThread() {
108 auto& me = *thisThread_;
109 auto& ths = threadList_.get();
110 // When new task is added to IOThreadPoolExecutor, a thread is chosen for it
111 // to be executed on, thisThread_ is by default chosen, however, if the new
112 // task is added by the clean up operations on thread destruction, thisThread_
113 // is not an available thread anymore, thus, always check whether or not
114 // thisThread_ is an available thread before choosing it.
115 if (me && std::find(ths.cbegin(), ths.cend(), me) != ths.cend()) {
122 auto thread = ths[nextThread_.fetch_add(1, std::memory_order_relaxed) % n];
123 return std::static_pointer_cast<IOThread>(thread);
126 EventBase* IOThreadPoolExecutor::getEventBase() {
127 RWSpinLock::ReadHolder r{&threadListLock_};
128 return pickThread()->eventBase;
131 EventBase* IOThreadPoolExecutor::getEventBase(
132 ThreadPoolExecutor::ThreadHandle* h) {
133 auto thread = dynamic_cast<IOThread*>(h);
136 return thread->eventBase;
142 EventBaseManager* IOThreadPoolExecutor::getEventBaseManager() {
143 return eventBaseManager_;
146 std::shared_ptr<ThreadPoolExecutor::Thread> IOThreadPoolExecutor::makeThread() {
147 return std::make_shared<IOThread>(this);
150 void IOThreadPoolExecutor::threadRun(ThreadPtr thread) {
151 this->threadPoolHook_.registerThread();
153 const auto ioThread = std::static_pointer_cast<IOThread>(thread);
154 ioThread->eventBase = eventBaseManager_->getEventBase();
155 thisThread_.reset(new std::shared_ptr<IOThread>(ioThread));
157 auto idler = std::make_unique<MemoryIdlerTimeout>(ioThread->eventBase);
158 ioThread->eventBase->runBeforeLoop(idler.get());
160 ioThread->eventBase->runInEventBaseThread(
161 [thread] { thread->startupBaton.post(); });
162 while (ioThread->shouldRun) {
163 ioThread->eventBase->loopForever();
166 while (ioThread->pendingTasks > 0) {
167 ioThread->eventBase->loopOnce();
172 // some tasks, like thrift asynchronous calls, create additional
173 // event base hookups, let's wait till all of them complete.
174 ioThread->eventBase->loop();
177 std::lock_guard<std::mutex> guard(ioThread->eventBaseShutdownMutex_);
178 ioThread->eventBase = nullptr;
179 eventBaseManager_->clearEventBase();
182 // threadListLock_ is writelocked
183 void IOThreadPoolExecutor::stopThreads(size_t n) {
184 std::vector<ThreadPtr> stoppedThreads;
185 stoppedThreads.reserve(n);
186 for (size_t i = 0; i < n; i++) {
187 const auto ioThread =
188 std::static_pointer_cast<IOThread>(threadList_.get()[i]);
189 for (auto& o : observers_) {
190 o->threadStopped(ioThread.get());
192 ioThread->shouldRun = false;
193 stoppedThreads.push_back(ioThread);
194 std::lock_guard<std::mutex> guard(ioThread->eventBaseShutdownMutex_);
195 if (ioThread->eventBase) {
196 ioThread->eventBase->terminateLoopSoon();
199 for (auto thread : stoppedThreads) {
200 stoppedThreads_.add(thread);
201 threadList_.remove(thread);
205 // threadListLock_ is readlocked
206 uint64_t IOThreadPoolExecutor::getPendingTaskCountImpl(
207 const folly::RWSpinLock::ReadHolder&) {
209 for (const auto& thread : threadList_.get()) {
210 auto ioThread = std::static_pointer_cast<IOThread>(thread);
211 size_t pendingTasks = ioThread->pendingTasks;
212 if (pendingTasks > 0 && !ioThread->idle) {
215 count += pendingTasks;