Add missing override and remove redundant virtual in folly
[folly.git] / folly / wangle / concurrent / IOThreadPoolExecutor.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
18
19 #include <folly/MoveWrapper.h>
20 #include <glog/logging.h>
21
22 #include <folly/detail/MemoryIdler.h>
23
24 namespace folly { namespace wangle {
25
26 using folly::detail::MemoryIdler;
27
28 /* Class that will free jemalloc caches and madvise the stack away
29  * if the event loop is unused for some period of time
30  */
31 class MemoryIdlerTimeout
32     : public AsyncTimeout , public EventBase::LoopCallback {
33  public:
34   explicit MemoryIdlerTimeout(EventBase* b) : AsyncTimeout(b), base_(b) {}
35
36   void timeoutExpired() noexcept override { idled = true; }
37
38   void runLoopCallback() noexcept override {
39     if (idled) {
40       MemoryIdler::flushLocalMallocCaches();
41       MemoryIdler::unmapUnusedStack(MemoryIdler::kDefaultStackToRetain);
42
43       idled = false;
44     } else {
45       std::chrono::steady_clock::duration idleTimeout =
46         MemoryIdler::defaultIdleTimeout.load(
47           std::memory_order_acquire);
48
49       idleTimeout = MemoryIdler::getVariationTimeout(idleTimeout);
50
51       scheduleTimeout(std::chrono::duration_cast<std::chrono::milliseconds>(
52                         idleTimeout).count());
53     }
54
55     // reschedule this callback for the next event loop.
56     base_->runBeforeLoop(this);
57   }
58  private:
59   EventBase* base_;
60   bool idled{false};
61 } ;
62
63 IOThreadPoolExecutor::IOThreadPoolExecutor(
64     size_t numThreads,
65     std::shared_ptr<ThreadFactory> threadFactory,
66     EventBaseManager* ebm)
67   : ThreadPoolExecutor(numThreads, std::move(threadFactory)),
68     nextThread_(0),
69     eventBaseManager_(ebm) {
70   addThreads(numThreads);
71   CHECK(threadList_.get().size() == numThreads);
72 }
73
74 IOThreadPoolExecutor::~IOThreadPoolExecutor() {
75   stop();
76 }
77
78 void IOThreadPoolExecutor::add(Func func) {
79   add(std::move(func), std::chrono::milliseconds(0));
80 }
81
82 void IOThreadPoolExecutor::add(
83     Func func,
84     std::chrono::milliseconds expiration,
85     Func expireCallback) {
86   RWSpinLock::ReadHolder{&threadListLock_};
87   if (threadList_.get().empty()) {
88     throw std::runtime_error("No threads available");
89   }
90   auto ioThread = pickThread();
91
92   auto moveTask = folly::makeMoveWrapper(
93       Task(std::move(func), expiration, std::move(expireCallback)));
94   auto wrappedFunc = [ioThread, moveTask] () mutable {
95     runTask(ioThread, std::move(*moveTask));
96     ioThread->pendingTasks--;
97   };
98
99   ioThread->pendingTasks++;
100   if (!ioThread->eventBase->runInEventBaseThread(std::move(wrappedFunc))) {
101     ioThread->pendingTasks--;
102     throw std::runtime_error("Unable to run func in event base thread");
103   }
104 }
105
106 std::shared_ptr<IOThreadPoolExecutor::IOThread>
107 IOThreadPoolExecutor::pickThread() {
108   if (*thisThread_) {
109     return *thisThread_;
110   }
111   auto thread = threadList_.get()[nextThread_++ % threadList_.get().size()];
112   return std::static_pointer_cast<IOThread>(thread);
113 }
114
115 EventBase* IOThreadPoolExecutor::getEventBase() {
116   return pickThread()->eventBase;
117 }
118
119 EventBase* IOThreadPoolExecutor::getEventBase(
120     ThreadPoolExecutor::ThreadHandle* h) {
121   auto thread = dynamic_cast<IOThread*>(h);
122
123   if (thread) {
124     return thread->eventBase;
125   }
126
127   return nullptr;
128 }
129
130 EventBaseManager* IOThreadPoolExecutor::getEventBaseManager() {
131   return eventBaseManager_;
132 }
133
134 std::shared_ptr<ThreadPoolExecutor::Thread>
135 IOThreadPoolExecutor::makeThread() {
136   return std::make_shared<IOThread>(this);
137 }
138
139 void IOThreadPoolExecutor::threadRun(ThreadPtr thread) {
140   const auto ioThread = std::static_pointer_cast<IOThread>(thread);
141   ioThread->eventBase = eventBaseManager_->getEventBase();
142   thisThread_.reset(new std::shared_ptr<IOThread>(ioThread));
143
144   auto idler = new MemoryIdlerTimeout(ioThread->eventBase);
145   ioThread->eventBase->runBeforeLoop(idler);
146
147   thread->startupBaton.post();
148   while (ioThread->shouldRun) {
149     ioThread->eventBase->loopForever();
150   }
151   if (isJoin_) {
152     while (ioThread->pendingTasks > 0) {
153       ioThread->eventBase->loopOnce();
154     }
155   }
156   stoppedThreads_.add(ioThread);
157
158   ioThread->eventBase = nullptr;
159   eventBaseManager_->clearEventBase();
160 }
161
162 // threadListLock_ is writelocked
163 void IOThreadPoolExecutor::stopThreads(size_t n) {
164   for (size_t i = 0; i < n; i++) {
165     const auto ioThread = std::static_pointer_cast<IOThread>(
166         threadList_.get()[i]);
167     for (auto& o : observers_) {
168       o->threadStopped(ioThread.get());
169     }
170     ioThread->shouldRun = false;
171     ioThread->eventBase->terminateLoopSoon();
172   }
173 }
174
175 // threadListLock_ is readlocked
176 uint64_t IOThreadPoolExecutor::getPendingTaskCount() {
177   uint64_t count = 0;
178   for (const auto& thread : threadList_.get()) {
179     auto ioThread = std::static_pointer_cast<IOThread>(thread);
180     size_t pendingTasks = ioThread->pendingTasks;
181     if (pendingTasks > 0 && !ioThread->idle) {
182       pendingTasks--;
183     }
184     count += pendingTasks;
185   }
186   return count;
187 }
188
189 }} // folly::wangle