2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
19 #include <folly/MoveWrapper.h>
20 #include <glog/logging.h>
22 #include <folly/detail/MemoryIdler.h>
24 namespace folly { namespace wangle {
26 using folly::detail::MemoryIdler;
28 /* Class that will free jemalloc caches and madvise the stack away
29 * if the event loop is unused for some period of time
31 class MemoryIdlerTimeout
32 : public AsyncTimeout , public EventBase::LoopCallback {
34 explicit MemoryIdlerTimeout(EventBase* b) : AsyncTimeout(b), base_(b) {}
36 void timeoutExpired() noexcept override { idled = true; }
38 void runLoopCallback() noexcept override {
40 MemoryIdler::flushLocalMallocCaches();
41 MemoryIdler::unmapUnusedStack(MemoryIdler::kDefaultStackToRetain);
45 std::chrono::steady_clock::duration idleTimeout =
46 MemoryIdler::defaultIdleTimeout.load(
47 std::memory_order_acquire);
49 idleTimeout = MemoryIdler::getVariationTimeout(idleTimeout);
51 scheduleTimeout(std::chrono::duration_cast<std::chrono::milliseconds>(
52 idleTimeout).count());
55 // reschedule this callback for the next event loop.
56 base_->runBeforeLoop(this);
63 IOThreadPoolExecutor::IOThreadPoolExecutor(
65 std::shared_ptr<ThreadFactory> threadFactory,
66 EventBaseManager* ebm)
67 : ThreadPoolExecutor(numThreads, std::move(threadFactory)),
69 eventBaseManager_(ebm) {
70 addThreads(numThreads);
71 CHECK(threadList_.get().size() == numThreads);
74 IOThreadPoolExecutor::~IOThreadPoolExecutor() {
78 void IOThreadPoolExecutor::add(Func func) {
79 add(std::move(func), std::chrono::milliseconds(0));
82 void IOThreadPoolExecutor::add(
84 std::chrono::milliseconds expiration,
85 Func expireCallback) {
86 RWSpinLock::ReadHolder{&threadListLock_};
87 if (threadList_.get().empty()) {
88 throw std::runtime_error("No threads available");
90 auto ioThread = pickThread();
92 auto moveTask = folly::makeMoveWrapper(
93 Task(std::move(func), expiration, std::move(expireCallback)));
94 auto wrappedFunc = [ioThread, moveTask] () mutable {
95 runTask(ioThread, std::move(*moveTask));
96 ioThread->pendingTasks--;
99 ioThread->pendingTasks++;
100 if (!ioThread->eventBase->runInEventBaseThread(std::move(wrappedFunc))) {
101 ioThread->pendingTasks--;
102 throw std::runtime_error("Unable to run func in event base thread");
106 std::shared_ptr<IOThreadPoolExecutor::IOThread>
107 IOThreadPoolExecutor::pickThread() {
111 auto thread = threadList_.get()[nextThread_++ % threadList_.get().size()];
112 return std::static_pointer_cast<IOThread>(thread);
115 EventBase* IOThreadPoolExecutor::getEventBase() {
116 return pickThread()->eventBase;
119 EventBase* IOThreadPoolExecutor::getEventBase(
120 ThreadPoolExecutor::ThreadHandle* h) {
121 auto thread = dynamic_cast<IOThread*>(h);
124 return thread->eventBase;
130 EventBaseManager* IOThreadPoolExecutor::getEventBaseManager() {
131 return eventBaseManager_;
134 std::shared_ptr<ThreadPoolExecutor::Thread>
135 IOThreadPoolExecutor::makeThread() {
136 return std::make_shared<IOThread>(this);
139 void IOThreadPoolExecutor::threadRun(ThreadPtr thread) {
140 const auto ioThread = std::static_pointer_cast<IOThread>(thread);
141 ioThread->eventBase = eventBaseManager_->getEventBase();
142 thisThread_.reset(new std::shared_ptr<IOThread>(ioThread));
144 auto idler = new MemoryIdlerTimeout(ioThread->eventBase);
145 ioThread->eventBase->runBeforeLoop(idler);
147 thread->startupBaton.post();
148 while (ioThread->shouldRun) {
149 ioThread->eventBase->loopForever();
152 while (ioThread->pendingTasks > 0) {
153 ioThread->eventBase->loopOnce();
156 stoppedThreads_.add(ioThread);
158 ioThread->eventBase = nullptr;
159 eventBaseManager_->clearEventBase();
162 // threadListLock_ is writelocked
163 void IOThreadPoolExecutor::stopThreads(size_t n) {
164 for (size_t i = 0; i < n; i++) {
165 const auto ioThread = std::static_pointer_cast<IOThread>(
166 threadList_.get()[i]);
167 for (auto& o : observers_) {
168 o->threadStopped(ioThread.get());
170 ioThread->shouldRun = false;
171 ioThread->eventBase->terminateLoopSoon();
175 // threadListLock_ is readlocked
176 uint64_t IOThreadPoolExecutor::getPendingTaskCount() {
178 for (const auto& thread : threadList_.get()) {
179 auto ioThread = std::static_pointer_cast<IOThread>(thread);
180 size_t pendingTasks = ioThread->pendingTasks;
181 if (pendingTasks > 0 && !ioThread->idle) {
184 count += pendingTasks;