Extend IO pool with external optional event_base_manager
[folly.git] / folly / wangle / concurrent / IOThreadPoolExecutor.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
18
19 #include <folly/MoveWrapper.h>
20 #include <glog/logging.h>
21
22 #include <folly/detail/MemoryIdler.h>
23
24 namespace folly { namespace wangle {
25
26 using folly::detail::MemoryIdler;
27
28 /* Class that will free jemalloc caches and madvise the stack away
29  * if the event loop is unused for some period of time
30  */
31 class MemoryIdlerTimeout
32     : public AsyncTimeout , public EventBase::LoopCallback {
33  public:
34   explicit MemoryIdlerTimeout(EventBase* b) : AsyncTimeout(b), base_(b) {}
35
36   virtual void timeoutExpired() noexcept {
37     idled = true;
38   }
39
40   virtual void runLoopCallback() noexcept {
41     if (idled) {
42       MemoryIdler::flushLocalMallocCaches();
43       MemoryIdler::unmapUnusedStack(MemoryIdler::kDefaultStackToRetain);
44
45       idled = false;
46     } else {
47       std::chrono::steady_clock::duration idleTimeout =
48         MemoryIdler::defaultIdleTimeout.load(
49           std::memory_order_acquire);
50
51       idleTimeout = MemoryIdler::getVariationTimeout(idleTimeout);
52
53       scheduleTimeout(std::chrono::duration_cast<std::chrono::milliseconds>(
54                         idleTimeout).count());
55     }
56
57     // reschedule this callback for the next event loop.
58     base_->runBeforeLoop(this);
59   }
60  private:
61   EventBase* base_;
62   bool idled{false};
63 } ;
64
65 IOThreadPoolExecutor::IOThreadPoolExecutor(
66     size_t numThreads,
67     std::shared_ptr<ThreadFactory> threadFactory,
68     EventBaseManager* ebm)
69   : ThreadPoolExecutor(numThreads, std::move(threadFactory)),
70     nextThread_(0),
71     eventBaseManager_(ebm) {
72   addThreads(numThreads);
73   CHECK(threadList_.get().size() == numThreads);
74 }
75
76 IOThreadPoolExecutor::~IOThreadPoolExecutor() {
77   stop();
78 }
79
80 void IOThreadPoolExecutor::add(Func func) {
81   add(std::move(func), std::chrono::milliseconds(0));
82 }
83
84 void IOThreadPoolExecutor::add(
85     Func func,
86     std::chrono::milliseconds expiration,
87     Func expireCallback) {
88   RWSpinLock::ReadHolder{&threadListLock_};
89   if (threadList_.get().empty()) {
90     throw std::runtime_error("No threads available");
91   }
92   auto ioThread = pickThread();
93
94   auto moveTask = folly::makeMoveWrapper(
95       Task(std::move(func), expiration, std::move(expireCallback)));
96   auto wrappedFunc = [ioThread, moveTask] () mutable {
97     runTask(ioThread, std::move(*moveTask));
98     ioThread->pendingTasks--;
99   };
100
101   ioThread->pendingTasks++;
102   if (!ioThread->eventBase->runInEventBaseThread(std::move(wrappedFunc))) {
103     ioThread->pendingTasks--;
104     throw std::runtime_error("Unable to run func in event base thread");
105   }
106 }
107
108 std::shared_ptr<IOThreadPoolExecutor::IOThread>
109 IOThreadPoolExecutor::pickThread() {
110   if (*thisThread_) {
111     return *thisThread_;
112   }
113   auto thread = threadList_.get()[nextThread_++ % threadList_.get().size()];
114   return std::static_pointer_cast<IOThread>(thread);
115 }
116
117 EventBase* IOThreadPoolExecutor::getEventBase() {
118   return pickThread()->eventBase;
119 }
120
121 EventBase* IOThreadPoolExecutor::getEventBase(
122     ThreadPoolExecutor::ThreadHandle* h) {
123   auto thread = dynamic_cast<IOThread*>(h);
124
125   if (thread) {
126     return thread->eventBase;
127   }
128
129   return nullptr;
130 }
131
132 std::shared_ptr<ThreadPoolExecutor::Thread>
133 IOThreadPoolExecutor::makeThread() {
134   return std::make_shared<IOThread>(this);
135 }
136
137 void IOThreadPoolExecutor::threadRun(ThreadPtr thread) {
138   const auto ioThread = std::static_pointer_cast<IOThread>(thread);
139   ioThread->eventBase = eventBaseManager_->getEventBase();
140   thisThread_.reset(new std::shared_ptr<IOThread>(ioThread));
141
142   auto idler = new MemoryIdlerTimeout(ioThread->eventBase);
143   ioThread->eventBase->runBeforeLoop(idler);
144
145   thread->startupBaton.post();
146   while (ioThread->shouldRun) {
147     ioThread->eventBase->loopForever();
148   }
149   if (isJoin_) {
150     while (ioThread->pendingTasks > 0) {
151       ioThread->eventBase->loopOnce();
152     }
153   }
154   stoppedThreads_.add(ioThread);
155 }
156
157 // threadListLock_ is writelocked
158 void IOThreadPoolExecutor::stopThreads(size_t n) {
159   for (size_t i = 0; i < n; i++) {
160     const auto ioThread = std::static_pointer_cast<IOThread>(
161         threadList_.get()[i]);
162     for (auto& o : observers_) {
163       o->threadStopped(ioThread.get());
164     }
165     ioThread->shouldRun = false;
166     ioThread->eventBase->terminateLoopSoon();
167   }
168 }
169
170 // threadListLock_ is readlocked
171 uint64_t IOThreadPoolExecutor::getPendingTaskCount() {
172   uint64_t count = 0;
173   for (const auto& thread : threadList_.get()) {
174     auto ioThread = std::static_pointer_cast<IOThread>(thread);
175     size_t pendingTasks = ioThread->pendingTasks;
176     if (pendingTasks > 0 && !ioThread->idle) {
177       pendingTasks--;
178     }
179     count += pendingTasks;
180   }
181   return count;
182 }
183
184 }} // folly::wangle