Copyright 2014->2015
[folly.git] / folly / wangle / concurrent / CPUThreadPoolExecutor.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/wangle/concurrent/CPUThreadPoolExecutor.h>
18 #include <folly/wangle/concurrent/PriorityLifoSemMPMCQueue.h>
19
20 namespace folly { namespace wangle {
21
22 const size_t CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 18;
23 const size_t CPUThreadPoolExecutor::kDefaultNumPriorities = 2;
24
25 CPUThreadPoolExecutor::CPUThreadPoolExecutor(
26     size_t numThreads,
27     std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
28     std::shared_ptr<ThreadFactory> threadFactory)
29     : ThreadPoolExecutor(numThreads, std::move(threadFactory)),
30       taskQueue_(std::move(taskQueue)) {
31   addThreads(numThreads);
32   CHECK(threadList_.get().size() == numThreads);
33 }
34
35 CPUThreadPoolExecutor::CPUThreadPoolExecutor(
36     size_t numThreads,
37     std::shared_ptr<ThreadFactory> threadFactory)
38     : CPUThreadPoolExecutor(
39           numThreads,
40           folly::make_unique<LifoSemMPMCQueue<CPUTask>>(
41               CPUThreadPoolExecutor::kDefaultMaxQueueSize),
42           std::move(threadFactory)) {}
43
44 CPUThreadPoolExecutor::CPUThreadPoolExecutor(size_t numThreads)
45     : CPUThreadPoolExecutor(
46           numThreads,
47           std::make_shared<NamedThreadFactory>("CPUThreadPool")) {}
48
49 CPUThreadPoolExecutor::CPUThreadPoolExecutor(
50     size_t numThreads,
51     uint32_t numPriorities,
52     std::shared_ptr<ThreadFactory> threadFactory)
53     : CPUThreadPoolExecutor(
54           numThreads,
55           folly::make_unique<PriorityLifoSemMPMCQueue<CPUTask>>(
56               numPriorities,
57               CPUThreadPoolExecutor::kDefaultMaxQueueSize),
58           std::move(threadFactory)) {}
59
60 CPUThreadPoolExecutor::~CPUThreadPoolExecutor() {
61   stop();
62   CHECK(threadsToStop_ == 0);
63 }
64
65 void CPUThreadPoolExecutor::add(Func func) {
66   add(std::move(func), std::chrono::milliseconds(0));
67 }
68
69 void CPUThreadPoolExecutor::add(
70     Func func,
71     std::chrono::milliseconds expiration,
72     Func expireCallback) {
73   // TODO handle enqueue failure, here and in other add() callsites
74   taskQueue_->add(
75       CPUTask(std::move(func), expiration, std::move(expireCallback)));
76 }
77
78 void CPUThreadPoolExecutor::add(Func func, uint32_t priority) {
79   add(std::move(func), priority, std::chrono::milliseconds(0));
80 }
81
82 void CPUThreadPoolExecutor::add(
83     Func func,
84     uint32_t priority,
85     std::chrono::milliseconds expiration,
86     Func expireCallback) {
87   CHECK(priority < getNumPriorities());
88   taskQueue_->addWithPriority(
89       CPUTask(std::move(func), expiration, std::move(expireCallback)),
90       priority);
91 }
92
93 uint32_t CPUThreadPoolExecutor::getNumPriorities() const {
94   return taskQueue_->getNumPriorities();
95 }
96
97 BlockingQueue<CPUThreadPoolExecutor::CPUTask>*
98 CPUThreadPoolExecutor::getTaskQueue() {
99   return taskQueue_.get();
100 }
101
102 void CPUThreadPoolExecutor::threadRun(std::shared_ptr<Thread> thread) {
103   thread->startupBaton.post();
104   while (1) {
105     auto task = taskQueue_->take();
106     if (UNLIKELY(task.poison)) {
107       CHECK(threadsToStop_-- > 0);
108       for (auto& o : observers_) {
109         o->threadStopped(thread.get());
110       }
111
112       stoppedThreads_.add(thread);
113       return;
114     } else {
115       runTask(thread, std::move(task));
116     }
117
118     if (UNLIKELY(threadsToStop_ > 0 && !isJoin_)) {
119       if (--threadsToStop_ >= 0) {
120         stoppedThreads_.add(thread);
121         return;
122       } else {
123         threadsToStop_++;
124       }
125     }
126   }
127 }
128
129 void CPUThreadPoolExecutor::stopThreads(size_t n) {
130   CHECK(stoppedThreads_.size() == 0);
131   threadsToStop_ = n;
132   for (size_t i = 0; i < n; i++) {
133     taskQueue_->add(CPUTask());
134   }
135 }
136
137 uint64_t CPUThreadPoolExecutor::getPendingTaskCount() {
138   return taskQueue_->size();
139 }
140
141 }} // folly::wangle