Log (de)compression bytes
[folly.git] / folly / executors / CPUThreadPoolExecutor.cpp
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/executors/CPUThreadPoolExecutor.h>
18 #include <folly/executors/task_queue/PriorityLifoSemMPMCQueue.h>
19
20 namespace folly {
21
22 const size_t CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 14;
23
24 CPUThreadPoolExecutor::CPUThreadPoolExecutor(
25     size_t numThreads,
26     std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
27     std::shared_ptr<ThreadFactory> threadFactory)
28     : ThreadPoolExecutor(numThreads, std::move(threadFactory)),
29       taskQueue_(std::move(taskQueue)) {
30   setNumThreads(numThreads);
31 }
32
33 CPUThreadPoolExecutor::CPUThreadPoolExecutor(
34     size_t numThreads,
35     std::shared_ptr<ThreadFactory> threadFactory)
36     : CPUThreadPoolExecutor(
37           numThreads,
38           std::make_unique<LifoSemMPMCQueue<CPUTask>>(
39               CPUThreadPoolExecutor::kDefaultMaxQueueSize),
40           std::move(threadFactory)) {}
41
42 CPUThreadPoolExecutor::CPUThreadPoolExecutor(size_t numThreads)
43     : CPUThreadPoolExecutor(
44           numThreads,
45           std::make_shared<NamedThreadFactory>("CPUThreadPool")) {}
46
47 CPUThreadPoolExecutor::CPUThreadPoolExecutor(
48     size_t numThreads,
49     int8_t numPriorities,
50     std::shared_ptr<ThreadFactory> threadFactory)
51     : CPUThreadPoolExecutor(
52           numThreads,
53           std::make_unique<PriorityLifoSemMPMCQueue<CPUTask>>(
54               numPriorities,
55               CPUThreadPoolExecutor::kDefaultMaxQueueSize),
56           std::move(threadFactory)) {}
57
58 CPUThreadPoolExecutor::CPUThreadPoolExecutor(
59     size_t numThreads,
60     int8_t numPriorities,
61     size_t maxQueueSize,
62     std::shared_ptr<ThreadFactory> threadFactory)
63     : CPUThreadPoolExecutor(
64           numThreads,
65           std::make_unique<PriorityLifoSemMPMCQueue<CPUTask>>(
66               numPriorities,
67               maxQueueSize),
68           std::move(threadFactory)) {}
69
70 CPUThreadPoolExecutor::~CPUThreadPoolExecutor() {
71   stop();
72   CHECK(threadsToStop_ == 0);
73 }
74
75 void CPUThreadPoolExecutor::add(Func func) {
76   add(std::move(func), std::chrono::milliseconds(0));
77 }
78
79 void CPUThreadPoolExecutor::add(
80     Func func,
81     std::chrono::milliseconds expiration,
82     Func expireCallback) {
83   // TODO handle enqueue failure, here and in other add() callsites
84   taskQueue_->add(
85       CPUTask(std::move(func), expiration, std::move(expireCallback)));
86 }
87
88 void CPUThreadPoolExecutor::addWithPriority(Func func, int8_t priority) {
89   add(std::move(func), priority, std::chrono::milliseconds(0));
90 }
91
92 void CPUThreadPoolExecutor::add(
93     Func func,
94     int8_t priority,
95     std::chrono::milliseconds expiration,
96     Func expireCallback) {
97   CHECK(getNumPriorities() > 0);
98   taskQueue_->addWithPriority(
99       CPUTask(std::move(func), expiration, std::move(expireCallback)),
100       priority);
101 }
102
103 uint8_t CPUThreadPoolExecutor::getNumPriorities() const {
104   return taskQueue_->getNumPriorities();
105 }
106
107 BlockingQueue<CPUThreadPoolExecutor::CPUTask>*
108 CPUThreadPoolExecutor::getTaskQueue() {
109   return taskQueue_.get();
110 }
111
112 void CPUThreadPoolExecutor::threadRun(std::shared_ptr<Thread> thread) {
113   this->threadPoolHook_.registerThread();
114
115   thread->startupBaton.post();
116   while (true) {
117     auto task = taskQueue_->take();
118     if (UNLIKELY(task.poison)) {
119       CHECK(threadsToStop_-- > 0);
120       for (auto& o : observers_) {
121         o->threadStopped(thread.get());
122       }
123       folly::RWSpinLock::WriteHolder w{&threadListLock_};
124       threadList_.remove(thread);
125       stoppedThreads_.add(thread);
126       return;
127     } else {
128       runTask(thread, std::move(task));
129     }
130
131     if (UNLIKELY(threadsToStop_ > 0 && !isJoin_)) {
132       if (--threadsToStop_ >= 0) {
133         folly::RWSpinLock::WriteHolder w{&threadListLock_};
134         threadList_.remove(thread);
135         stoppedThreads_.add(thread);
136         return;
137       } else {
138         threadsToStop_++;
139       }
140     }
141   }
142 }
143
144 void CPUThreadPoolExecutor::stopThreads(size_t n) {
145   threadsToStop_ += n;
146   for (size_t i = 0; i < n; i++) {
147     taskQueue_->addWithPriority(CPUTask(), Executor::LO_PRI);
148   }
149 }
150
151 // threadListLock_ is readlocked
152 uint64_t CPUThreadPoolExecutor::getPendingTaskCountImpl(
153     const folly::RWSpinLock::ReadHolder&) {
154   return taskQueue_->size();
155 }
156
157 } // namespace folly