stats for ThreadPoolExecutor
[folly.git] / folly / experimental / wangle / concurrent / CPUThreadPoolExecutor.cpp
1 /*
2  * Copyright 2014 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h>
18
19 namespace folly { namespace wangle {
20
21 const size_t CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 18;
22
23 CPUThreadPoolExecutor::CPUThreadPoolExecutor(
24     size_t numThreads,
25     std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
26     std::unique_ptr<ThreadFactory> threadFactory)
27     : ThreadPoolExecutor(numThreads, std::move(threadFactory)),
28       taskQueue_(std::move(taskQueue)) {
29   addThreads(numThreads);
30   CHECK(threadList_.get().size() == numThreads);
31 }
32
33 CPUThreadPoolExecutor::~CPUThreadPoolExecutor() {
34   stop();
35   CHECK(threadsToStop_ == 0);
36 }
37
38 void CPUThreadPoolExecutor::add(Func func) {
39   // TODO handle enqueue failure, here and in other add() callsites
40   taskQueue_->add(CPUTask(std::move(func)));
41 }
42
43 void CPUThreadPoolExecutor::threadRun(std::shared_ptr<Thread> thread) {
44   while (1) {
45     // TODO expiration / codel
46     auto task = taskQueue_->take();
47     if (UNLIKELY(task.poison)) {
48       CHECK(threadsToStop_-- > 0);
49       stoppedThreads_.add(thread);
50       return;
51     } else {
52       runTask(thread, std::move(task));
53     }
54
55     if (UNLIKELY(threadsToStop_ > 0 && !isJoin_)) {
56       if (--threadsToStop_ >= 0) {
57         stoppedThreads_.add(thread);
58         return;
59       } else {
60         threadsToStop_++;
61       }
62     }
63   }
64 }
65
66 void CPUThreadPoolExecutor::stopThreads(size_t n) {
67   CHECK(stoppedThreads_.size() == 0);
68   threadsToStop_ = n;
69   for (int i = 0; i < n; i++) {
70     taskQueue_->add(CPUTask());
71   }
72 }
73
74 uint64_t CPUThreadPoolExecutor::getPendingTaskCount() {
75   return taskQueue_->size();
76 }
77
78 }} // folly::wangle