copy wangle back into folly
[folly.git] / folly / wangle / concurrent / CPUThreadPoolExecutor.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/wangle/concurrent/CPUThreadPoolExecutor.h>
18 #include <folly/wangle/concurrent/PriorityLifoSemMPMCQueue.h>
19
20 namespace folly { namespace wangle {
21
22 const size_t CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 14;
23
24 CPUThreadPoolExecutor::CPUThreadPoolExecutor(
25     size_t numThreads,
26     std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
27     std::shared_ptr<ThreadFactory> threadFactory)
28     : ThreadPoolExecutor(numThreads, std::move(threadFactory)),
29       taskQueue_(std::move(taskQueue)) {
30   addThreads(numThreads);
31   CHECK(threadList_.get().size() == numThreads);
32 }
33
34 CPUThreadPoolExecutor::CPUThreadPoolExecutor(
35     size_t numThreads,
36     std::shared_ptr<ThreadFactory> threadFactory)
37     : CPUThreadPoolExecutor(
38           numThreads,
39           folly::make_unique<LifoSemMPMCQueue<CPUTask>>(
40               CPUThreadPoolExecutor::kDefaultMaxQueueSize),
41           std::move(threadFactory)) {}
42
43 CPUThreadPoolExecutor::CPUThreadPoolExecutor(size_t numThreads)
44     : CPUThreadPoolExecutor(
45           numThreads,
46           std::make_shared<NamedThreadFactory>("CPUThreadPool")) {}
47
48 CPUThreadPoolExecutor::CPUThreadPoolExecutor(
49     size_t numThreads,
50     int8_t numPriorities,
51     std::shared_ptr<ThreadFactory> threadFactory)
52     : CPUThreadPoolExecutor(
53           numThreads,
54           folly::make_unique<PriorityLifoSemMPMCQueue<CPUTask>>(
55               numPriorities,
56               CPUThreadPoolExecutor::kDefaultMaxQueueSize),
57           std::move(threadFactory)) {}
58
59 CPUThreadPoolExecutor::CPUThreadPoolExecutor(
60     size_t numThreads,
61     int8_t numPriorities,
62     size_t maxQueueSize,
63     std::shared_ptr<ThreadFactory> threadFactory)
64     : CPUThreadPoolExecutor(
65           numThreads,
66           folly::make_unique<PriorityLifoSemMPMCQueue<CPUTask>>(
67               numPriorities,
68               maxQueueSize),
69           std::move(threadFactory)) {}
70
71 CPUThreadPoolExecutor::~CPUThreadPoolExecutor() {
72   stop();
73   CHECK(threadsToStop_ == 0);
74 }
75
76 void CPUThreadPoolExecutor::add(Func func) {
77   add(std::move(func), std::chrono::milliseconds(0));
78 }
79
80 void CPUThreadPoolExecutor::add(
81     Func func,
82     std::chrono::milliseconds expiration,
83     Func expireCallback) {
84   // TODO handle enqueue failure, here and in other add() callsites
85   taskQueue_->add(
86       CPUTask(std::move(func), expiration, std::move(expireCallback)));
87 }
88
89 void CPUThreadPoolExecutor::addWithPriority(Func func, int8_t priority) {
90   add(std::move(func), priority, std::chrono::milliseconds(0));
91 }
92
93 void CPUThreadPoolExecutor::add(
94     Func func,
95     int8_t priority,
96     std::chrono::milliseconds expiration,
97     Func expireCallback) {
98   CHECK(getNumPriorities() > 0);
99   taskQueue_->addWithPriority(
100       CPUTask(std::move(func), expiration, std::move(expireCallback)),
101       priority);
102 }
103
104 uint8_t CPUThreadPoolExecutor::getNumPriorities() const {
105   return taskQueue_->getNumPriorities();
106 }
107
108 BlockingQueue<CPUThreadPoolExecutor::CPUTask>*
109 CPUThreadPoolExecutor::getTaskQueue() {
110   return taskQueue_.get();
111 }
112
113 void CPUThreadPoolExecutor::threadRun(std::shared_ptr<Thread> thread) {
114   thread->startupBaton.post();
115   while (1) {
116     auto task = taskQueue_->take();
117     if (UNLIKELY(task.poison)) {
118       CHECK(threadsToStop_-- > 0);
119       for (auto& o : observers_) {
120         o->threadStopped(thread.get());
121       }
122
123       stoppedThreads_.add(thread);
124       return;
125     } else {
126       runTask(thread, std::move(task));
127     }
128
129     if (UNLIKELY(threadsToStop_ > 0 && !isJoin_)) {
130       if (--threadsToStop_ >= 0) {
131         stoppedThreads_.add(thread);
132         return;
133       } else {
134         threadsToStop_++;
135       }
136     }
137   }
138 }
139
140 void CPUThreadPoolExecutor::stopThreads(size_t n) {
141   CHECK(stoppedThreads_.size() == 0);
142   threadsToStop_ = n;
143   for (size_t i = 0; i < n; i++) {
144     taskQueue_->addWithPriority(CPUTask(), Executor::LO_PRI);
145   }
146 }
147
148 uint64_t CPUThreadPoolExecutor::getPendingTaskCount() {
149   return taskQueue_->size();
150 }
151
152 }} // folly::wangle