Fix copyright lines
[folly.git] / folly / executors / IOThreadPoolExecutor.cpp
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/executors/IOThreadPoolExecutor.h>
18
19 #include <glog/logging.h>
20
21 #include <folly/detail/MemoryIdler.h>
22
23 namespace folly {
24
25 using folly::detail::MemoryIdler;
26
27 /* Class that will free jemalloc caches and madvise the stack away
28  * if the event loop is unused for some period of time
29  */
30 class MemoryIdlerTimeout : public AsyncTimeout, public EventBase::LoopCallback {
31  public:
32   explicit MemoryIdlerTimeout(EventBase* b) : AsyncTimeout(b), base_(b) {}
33
34   void timeoutExpired() noexcept override {
35     idled = true;
36   }
37
38   void runLoopCallback() noexcept override {
39     if (idled) {
40       MemoryIdler::flushLocalMallocCaches();
41       MemoryIdler::unmapUnusedStack(MemoryIdler::kDefaultStackToRetain);
42
43       idled = false;
44     } else {
45       std::chrono::steady_clock::duration idleTimeout =
46           MemoryIdler::defaultIdleTimeout.load(std::memory_order_acquire);
47
48       idleTimeout = MemoryIdler::getVariationTimeout(idleTimeout);
49
50       scheduleTimeout(static_cast<uint32_t>(
51           std::chrono::duration_cast<std::chrono::milliseconds>(idleTimeout)
52               .count()));
53     }
54
55     // reschedule this callback for the next event loop.
56     base_->runBeforeLoop(this);
57   }
58
59  private:
60   EventBase* base_;
61   bool idled{false};
62 };
63
64 IOThreadPoolExecutor::IOThreadPoolExecutor(
65     size_t numThreads,
66     std::shared_ptr<ThreadFactory> threadFactory,
67     EventBaseManager* ebm,
68     bool waitForAll)
69     : ThreadPoolExecutor(numThreads, std::move(threadFactory), waitForAll),
70       nextThread_(0),
71       eventBaseManager_(ebm) {
72   setNumThreads(numThreads);
73 }
74
75 IOThreadPoolExecutor::~IOThreadPoolExecutor() {
76   stop();
77 }
78
79 void IOThreadPoolExecutor::add(Func func) {
80   add(std::move(func), std::chrono::milliseconds(0));
81 }
82
83 void IOThreadPoolExecutor::add(
84     Func func,
85     std::chrono::milliseconds expiration,
86     Func expireCallback) {
87   RWSpinLock::ReadHolder r{&threadListLock_};
88   if (threadList_.get().empty()) {
89     throw std::runtime_error("No threads available");
90   }
91   auto ioThread = pickThread();
92
93   auto task = Task(std::move(func), expiration, std::move(expireCallback));
94   auto wrappedFunc = [ ioThread, task = std::move(task) ]() mutable {
95     runTask(ioThread, std::move(task));
96     ioThread->pendingTasks--;
97   };
98
99   ioThread->pendingTasks++;
100   if (!ioThread->eventBase->runInEventBaseThread(std::move(wrappedFunc))) {
101     ioThread->pendingTasks--;
102     throw std::runtime_error("Unable to run func in event base thread");
103   }
104 }
105
106 std::shared_ptr<IOThreadPoolExecutor::IOThread>
107 IOThreadPoolExecutor::pickThread() {
108   auto& me = *thisThread_;
109   auto& ths = threadList_.get();
110   // When new task is added to IOThreadPoolExecutor, a thread is chosen for it
111   // to be executed on, thisThread_ is by default chosen, however, if the new
112   // task is added by the clean up operations on thread destruction, thisThread_
113   // is not an available thread anymore, thus, always check whether or not
114   // thisThread_ is an available thread before choosing it.
115   if (me && std::find(ths.cbegin(), ths.cend(), me) != ths.cend()) {
116     return me;
117   }
118   auto n = ths.size();
119   if (n == 0) {
120     return me;
121   }
122   auto thread = ths[nextThread_.fetch_add(1, std::memory_order_relaxed) % n];
123   return std::static_pointer_cast<IOThread>(thread);
124 }
125
126 EventBase* IOThreadPoolExecutor::getEventBase() {
127   RWSpinLock::ReadHolder r{&threadListLock_};
128   return pickThread()->eventBase;
129 }
130
131 EventBase* IOThreadPoolExecutor::getEventBase(
132     ThreadPoolExecutor::ThreadHandle* h) {
133   auto thread = dynamic_cast<IOThread*>(h);
134
135   if (thread) {
136     return thread->eventBase;
137   }
138
139   return nullptr;
140 }
141
142 EventBaseManager* IOThreadPoolExecutor::getEventBaseManager() {
143   return eventBaseManager_;
144 }
145
146 std::shared_ptr<ThreadPoolExecutor::Thread> IOThreadPoolExecutor::makeThread() {
147   return std::make_shared<IOThread>(this);
148 }
149
150 void IOThreadPoolExecutor::threadRun(ThreadPtr thread) {
151   this->threadPoolHook_.registerThread();
152
153   const auto ioThread = std::static_pointer_cast<IOThread>(thread);
154   ioThread->eventBase = eventBaseManager_->getEventBase();
155   thisThread_.reset(new std::shared_ptr<IOThread>(ioThread));
156
157   auto idler = std::make_unique<MemoryIdlerTimeout>(ioThread->eventBase);
158   ioThread->eventBase->runBeforeLoop(idler.get());
159
160   ioThread->eventBase->runInEventBaseThread(
161       [thread] { thread->startupBaton.post(); });
162   while (ioThread->shouldRun) {
163     ioThread->eventBase->loopForever();
164   }
165   if (isJoin_) {
166     while (ioThread->pendingTasks > 0) {
167       ioThread->eventBase->loopOnce();
168     }
169   }
170   idler.reset();
171   if (isWaitForAll_) {
172     // some tasks, like thrift asynchronous calls, create additional
173     // event base hookups, let's wait till all of them complete.
174     ioThread->eventBase->loop();
175   }
176
177   std::lock_guard<std::mutex> guard(ioThread->eventBaseShutdownMutex_);
178   ioThread->eventBase = nullptr;
179   eventBaseManager_->clearEventBase();
180 }
181
182 // threadListLock_ is writelocked
183 void IOThreadPoolExecutor::stopThreads(size_t n) {
184   std::vector<ThreadPtr> stoppedThreads;
185   stoppedThreads.reserve(n);
186   for (size_t i = 0; i < n; i++) {
187     const auto ioThread =
188         std::static_pointer_cast<IOThread>(threadList_.get()[i]);
189     for (auto& o : observers_) {
190       o->threadStopped(ioThread.get());
191     }
192     ioThread->shouldRun = false;
193     stoppedThreads.push_back(ioThread);
194     std::lock_guard<std::mutex> guard(ioThread->eventBaseShutdownMutex_);
195     if (ioThread->eventBase) {
196       ioThread->eventBase->terminateLoopSoon();
197     }
198   }
199   for (auto thread : stoppedThreads) {
200     stoppedThreads_.add(thread);
201     threadList_.remove(thread);
202   }
203 }
204
205 // threadListLock_ is readlocked
206 uint64_t IOThreadPoolExecutor::getPendingTaskCountImpl(
207     const folly::RWSpinLock::ReadHolder&) {
208   uint64_t count = 0;
209   for (const auto& thread : threadList_.get()) {
210     auto ioThread = std::static_pointer_cast<IOThread>(thread);
211     size_t pendingTasks = ioThread->pendingTasks;
212     if (pendingTasks > 0 && !ioThread->idle) {
213       pendingTasks--;
214     }
215     count += pendingTasks;
216   }
217   return count;
218 }
219
220 } // namespace folly