remove Cpp2WorkerFactory
[folly.git] / folly / experimental / wangle / concurrent / IOThreadPoolExecutor.cpp
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h>
18
19 #include <folly/MoveWrapper.h>
20 #include <glog/logging.h>
21 #include <folly/io/async/EventBaseManager.h>
22
23 #include <folly/detail/MemoryIdler.h>
24
25 namespace folly { namespace wangle {
26
27 using folly::detail::MemoryIdler;
28
29 /* Class that will free jemalloc caches and madvise the stack away
30  * if the event loop is unused for some period of time
31  */
32 class MemoryIdlerTimeout
33     : public AsyncTimeout , public EventBase::LoopCallback {
34  public:
35   explicit MemoryIdlerTimeout(EventBase* b) : AsyncTimeout(b), base_(b) {}
36
37   virtual void timeoutExpired() noexcept {
38     idled = true;
39   }
40
41   virtual void runLoopCallback() noexcept {
42     if (idled) {
43       MemoryIdler::flushLocalMallocCaches();
44       MemoryIdler::unmapUnusedStack(MemoryIdler::kDefaultStackToRetain);
45
46       idled = false;
47     } else {
48       std::chrono::steady_clock::duration idleTimeout =
49         MemoryIdler::defaultIdleTimeout.load(
50           std::memory_order_acquire);
51
52       idleTimeout = MemoryIdler::getVariationTimeout(idleTimeout);
53
54       scheduleTimeout(std::chrono::duration_cast<std::chrono::milliseconds>(
55                         idleTimeout).count());
56     }
57
58     // reschedule this callback for the next event loop.
59     base_->runBeforeLoop(this);
60   }
61  private:
62   EventBase* base_;
63   bool idled{false};
64 } ;
65
66 IOThreadPoolExecutor::IOThreadPoolExecutor(
67     size_t numThreads,
68     std::shared_ptr<ThreadFactory> threadFactory)
69   : ThreadPoolExecutor(numThreads, std::move(threadFactory)),
70     nextThread_(0) {
71   addThreads(numThreads);
72   CHECK(threadList_.get().size() == numThreads);
73 }
74
75 IOThreadPoolExecutor::~IOThreadPoolExecutor() {
76   stop();
77 }
78
79 void IOThreadPoolExecutor::add(Func func) {
80   add(std::move(func), std::chrono::milliseconds(0));
81 }
82
83 void IOThreadPoolExecutor::add(
84     Func func,
85     std::chrono::milliseconds expiration,
86     Func expireCallback) {
87   RWSpinLock::ReadHolder{&threadListLock_};
88   if (threadList_.get().empty()) {
89     throw std::runtime_error("No threads available");
90   }
91   auto ioThread = pickThread();
92
93   auto moveTask = folly::makeMoveWrapper(
94       Task(std::move(func), expiration, std::move(expireCallback)));
95   auto wrappedFunc = [ioThread, moveTask] () mutable {
96     runTask(ioThread, std::move(*moveTask));
97     ioThread->pendingTasks--;
98   };
99
100   ioThread->pendingTasks++;
101   if (!ioThread->eventBase->runInEventBaseThread(std::move(wrappedFunc))) {
102     ioThread->pendingTasks--;
103     throw std::runtime_error("Unable to run func in event base thread");
104   }
105 }
106
107 std::shared_ptr<IOThreadPoolExecutor::IOThread>
108 IOThreadPoolExecutor::pickThread() {
109   if (*thisThread_) {
110     return *thisThread_;
111   }
112   auto thread = threadList_.get()[nextThread_++ % threadList_.get().size()];
113   return std::static_pointer_cast<IOThread>(thread);
114 }
115
116 EventBase* IOThreadPoolExecutor::getEventBase() {
117   return pickThread()->eventBase;
118 }
119
120 std::shared_ptr<ThreadPoolExecutor::Thread>
121 IOThreadPoolExecutor::makeThread() {
122   return std::make_shared<IOThread>(this);
123 }
124
125 void IOThreadPoolExecutor::threadRun(ThreadPtr thread) {
126   const auto ioThread = std::static_pointer_cast<IOThread>(thread);
127   ioThread->eventBase =
128     folly::EventBaseManager::get()->getEventBase();
129   thisThread_.reset(new std::shared_ptr<IOThread>(ioThread));
130
131   auto idler = new MemoryIdlerTimeout(ioThread->eventBase);
132   ioThread->eventBase->runBeforeLoop(idler);
133
134   thread->startupBaton.post();
135   while (ioThread->shouldRun) {
136     ioThread->eventBase->loopForever();
137   }
138   if (isJoin_) {
139     while (ioThread->pendingTasks > 0) {
140       ioThread->eventBase->loopOnce();
141     }
142   }
143   stoppedThreads_.add(ioThread);
144 }
145
146 // threadListLock_ is writelocked
147 void IOThreadPoolExecutor::stopThreads(size_t n) {
148   for (size_t i = 0; i < n; i++) {
149     const auto ioThread = std::static_pointer_cast<IOThread>(
150         threadList_.get()[i]);
151     ioThread->shouldRun = false;
152     ioThread->eventBase->terminateLoopSoon();
153   }
154 }
155
156 std::vector<EventBase*> IOThreadPoolExecutor::getEventBases() {
157   std::vector<EventBase*> bases;
158   RWSpinLock::ReadHolder{&threadListLock_};
159   for (const auto& thread : threadList_.get()) {
160     auto ioThread = std::static_pointer_cast<IOThread>(thread);
161     bases.push_back(ioThread->eventBase);
162   }
163   return bases;
164 }
165
166 // threadListLock_ is readlocked
167 uint64_t IOThreadPoolExecutor::getPendingTaskCount() {
168   uint64_t count = 0;
169   for (const auto& thread : threadList_.get()) {
170     auto ioThread = std::static_pointer_cast<IOThread>(thread);
171     size_t pendingTasks = ioThread->pendingTasks;
172     if (pendingTasks > 0 && !ioThread->idle) {
173       pendingTasks--;
174     }
175     count += pendingTasks;
176   }
177   return count;
178 }
179
180 }} // folly::wangle