removing non-existing file from the build
[folly.git] / folly / wangle / concurrent / ThreadPoolExecutor.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/wangle/concurrent/ThreadPoolExecutor.h>
18
19 namespace folly { namespace wangle {
20
21 ThreadPoolExecutor::ThreadPoolExecutor(
22     size_t numThreads,
23     std::shared_ptr<ThreadFactory> threadFactory)
24     : threadFactory_(std::move(threadFactory)),
25       taskStatsSubject_(std::make_shared<Subject<TaskStats>>()) {}
26
27 ThreadPoolExecutor::~ThreadPoolExecutor() {
28   CHECK(threadList_.get().size() == 0);
29 }
30
31 ThreadPoolExecutor::Task::Task(
32     Func&& func,
33     std::chrono::milliseconds expiration,
34     Func&& expireCallback)
35     : func_(std::move(func)),
36       expiration_(expiration),
37       expireCallback_(std::move(expireCallback)) {
38   // Assume that the task in enqueued on creation
39   enqueueTime_ = std::chrono::steady_clock::now();
40 }
41
42 void ThreadPoolExecutor::runTask(
43     const ThreadPtr& thread,
44     Task&& task) {
45   thread->idle = false;
46   auto startTime = std::chrono::steady_clock::now();
47   task.stats_.waitTime = startTime - task.enqueueTime_;
48   if (task.expiration_ > std::chrono::milliseconds(0) &&
49       task.stats_.waitTime >= task.expiration_) {
50     task.stats_.expired = true;
51     if (task.expireCallback_ != nullptr) {
52       task.expireCallback_();
53     }
54   } else {
55     try {
56       task.func_();
57     } catch (const std::exception& e) {
58       LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled " <<
59                     typeid(e).name() << " exception: " << e.what();
60     } catch (...) {
61       LOG(ERROR) << "ThreadPoolExecutor: func threw unhandled non-exception "
62                     "object";
63     }
64     task.stats_.runTime = std::chrono::steady_clock::now() - startTime;
65   }
66   thread->idle = true;
67   thread->taskStatsSubject->onNext(std::move(task.stats_));
68 }
69
70 size_t ThreadPoolExecutor::numThreads() {
71   RWSpinLock::ReadHolder{&threadListLock_};
72   return threadList_.get().size();
73 }
74
75 void ThreadPoolExecutor::setNumThreads(size_t n) {
76   RWSpinLock::WriteHolder{&threadListLock_};
77   const auto current = threadList_.get().size();
78   if (n > current ) {
79     addThreads(n - current);
80   } else if (n < current) {
81     removeThreads(current - n, true);
82   }
83   CHECK(threadList_.get().size() == n);
84 }
85
86 // threadListLock_ is writelocked
87 void ThreadPoolExecutor::addThreads(size_t n) {
88   std::vector<ThreadPtr> newThreads;
89   for (size_t i = 0; i < n; i++) {
90     newThreads.push_back(makeThread());
91   }
92   for (auto& thread : newThreads) {
93     // TODO need a notion of failing to create the thread
94     // and then handling for that case
95     thread->handle = threadFactory_->newThread(
96         std::bind(&ThreadPoolExecutor::threadRun, this, thread));
97     threadList_.add(thread);
98   }
99   for (auto& thread : newThreads) {
100     thread->startupBaton.wait();
101   }
102   for (auto& o : observers_) {
103     for (auto& thread : newThreads) {
104       o->threadStarted(thread.get());
105     }
106   }
107 }
108
109 // threadListLock_ is writelocked
110 void ThreadPoolExecutor::removeThreads(size_t n, bool isJoin) {
111   CHECK(n <= threadList_.get().size());
112   CHECK(stoppedThreads_.size() == 0);
113   isJoin_ = isJoin;
114   stopThreads(n);
115   for (size_t i = 0; i < n; i++) {
116     auto thread = stoppedThreads_.take();
117     thread->handle.join();
118     threadList_.remove(thread);
119   }
120   CHECK(stoppedThreads_.size() == 0);
121 }
122
123 void ThreadPoolExecutor::stop() {
124   RWSpinLock::WriteHolder{&threadListLock_};
125   removeThreads(threadList_.get().size(), false);
126   CHECK(threadList_.get().size() == 0);
127 }
128
129 void ThreadPoolExecutor::join() {
130   RWSpinLock::WriteHolder{&threadListLock_};
131   removeThreads(threadList_.get().size(), true);
132   CHECK(threadList_.get().size() == 0);
133 }
134
135 ThreadPoolExecutor::PoolStats ThreadPoolExecutor::getPoolStats() {
136   RWSpinLock::ReadHolder{&threadListLock_};
137   ThreadPoolExecutor::PoolStats stats;
138   stats.threadCount = threadList_.get().size();
139   for (auto thread : threadList_.get()) {
140     if (thread->idle) {
141       stats.idleThreadCount++;
142     } else {
143       stats.activeThreadCount++;
144     }
145   }
146   stats.pendingTaskCount = getPendingTaskCount();
147   stats.totalTaskCount = stats.pendingTaskCount + stats.activeThreadCount;
148   return stats;
149 }
150
151 std::atomic<uint64_t> ThreadPoolExecutor::Thread::nextId(0);
152
153 void ThreadPoolExecutor::StoppedThreadQueue::add(
154     ThreadPoolExecutor::ThreadPtr item) {
155   std::lock_guard<std::mutex> guard(mutex_);
156   queue_.push(std::move(item));
157   sem_.post();
158 }
159
160 ThreadPoolExecutor::ThreadPtr ThreadPoolExecutor::StoppedThreadQueue::take() {
161   while(1) {
162     {
163       std::lock_guard<std::mutex> guard(mutex_);
164       if (queue_.size() > 0) {
165         auto item = std::move(queue_.front());
166         queue_.pop();
167         return item;
168       }
169     }
170     sem_.wait();
171   }
172 }
173
174 size_t ThreadPoolExecutor::StoppedThreadQueue::size() {
175   std::lock_guard<std::mutex> guard(mutex_);
176   return queue_.size();
177 }
178
179 void ThreadPoolExecutor::addObserver(std::shared_ptr<Observer> o) {
180   RWSpinLock::ReadHolder{&threadListLock_};
181   observers_.push_back(o);
182   for (auto& thread : threadList_.get()) {
183     o->threadPreviouslyStarted(thread.get());
184   }
185 }
186
187 void ThreadPoolExecutor::removeObserver(std::shared_ptr<Observer> o) {
188   RWSpinLock::ReadHolder{&threadListLock_};
189   for (auto& thread : threadList_.get()) {
190     o->threadNotYetStopped(thread.get());
191   }
192
193   for (auto it = observers_.begin(); it != observers_.end(); it++) {
194     if (*it == o) {
195       observers_.erase(it);
196       return;
197     }
198   }
199   DCHECK(false);
200 }
201
202 }} // folly::wangle