--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/wangle/concurrent/CPUThreadPoolExecutor.h>
+#include <folly/wangle/concurrent/PriorityLifoSemMPMCQueue.h>
+
+namespace folly { namespace wangle {
+
+const size_t CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 14;
+
+CPUThreadPoolExecutor::CPUThreadPoolExecutor(
+ size_t numThreads,
+ std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
+ std::shared_ptr<ThreadFactory> threadFactory)
+ : ThreadPoolExecutor(numThreads, std::move(threadFactory)),
+ taskQueue_(std::move(taskQueue)) {
+ addThreads(numThreads);
+ CHECK(threadList_.get().size() == numThreads);
+}
+
+CPUThreadPoolExecutor::CPUThreadPoolExecutor(
+ size_t numThreads,
+ std::shared_ptr<ThreadFactory> threadFactory)
+ : CPUThreadPoolExecutor(
+ numThreads,
+ folly::make_unique<LifoSemMPMCQueue<CPUTask>>(
+ CPUThreadPoolExecutor::kDefaultMaxQueueSize),
+ std::move(threadFactory)) {}
+
+CPUThreadPoolExecutor::CPUThreadPoolExecutor(size_t numThreads)
+ : CPUThreadPoolExecutor(
+ numThreads,
+ std::make_shared<NamedThreadFactory>("CPUThreadPool")) {}
+
+CPUThreadPoolExecutor::CPUThreadPoolExecutor(
+ size_t numThreads,
+ int8_t numPriorities,
+ std::shared_ptr<ThreadFactory> threadFactory)
+ : CPUThreadPoolExecutor(
+ numThreads,
+ folly::make_unique<PriorityLifoSemMPMCQueue<CPUTask>>(
+ numPriorities,
+ CPUThreadPoolExecutor::kDefaultMaxQueueSize),
+ std::move(threadFactory)) {}
+
+CPUThreadPoolExecutor::CPUThreadPoolExecutor(
+ size_t numThreads,
+ int8_t numPriorities,
+ size_t maxQueueSize,
+ std::shared_ptr<ThreadFactory> threadFactory)
+ : CPUThreadPoolExecutor(
+ numThreads,
+ folly::make_unique<PriorityLifoSemMPMCQueue<CPUTask>>(
+ numPriorities,
+ maxQueueSize),
+ std::move(threadFactory)) {}
+
+CPUThreadPoolExecutor::~CPUThreadPoolExecutor() {
+ stop();
+ CHECK(threadsToStop_ == 0);
+}
+
+void CPUThreadPoolExecutor::add(Func func) {
+ add(std::move(func), std::chrono::milliseconds(0));
+}
+
+void CPUThreadPoolExecutor::add(
+ Func func,
+ std::chrono::milliseconds expiration,
+ Func expireCallback) {
+ // TODO handle enqueue failure, here and in other add() callsites
+ taskQueue_->add(
+ CPUTask(std::move(func), expiration, std::move(expireCallback)));
+}
+
+void CPUThreadPoolExecutor::addWithPriority(Func func, int8_t priority) {
+ add(std::move(func), priority, std::chrono::milliseconds(0));
+}
+
+void CPUThreadPoolExecutor::add(
+ Func func,
+ int8_t priority,
+ std::chrono::milliseconds expiration,
+ Func expireCallback) {
+ CHECK(getNumPriorities() > 0);
+ taskQueue_->addWithPriority(
+ CPUTask(std::move(func), expiration, std::move(expireCallback)),
+ priority);
+}
+
+uint8_t CPUThreadPoolExecutor::getNumPriorities() const {
+ return taskQueue_->getNumPriorities();
+}
+
+BlockingQueue<CPUThreadPoolExecutor::CPUTask>*
+CPUThreadPoolExecutor::getTaskQueue() {
+ return taskQueue_.get();
+}
+
+void CPUThreadPoolExecutor::threadRun(std::shared_ptr<Thread> thread) {
+ thread->startupBaton.post();
+ while (1) {
+ auto task = taskQueue_->take();
+ if (UNLIKELY(task.poison)) {
+ CHECK(threadsToStop_-- > 0);
+ for (auto& o : observers_) {
+ o->threadStopped(thread.get());
+ }
+
+ stoppedThreads_.add(thread);
+ return;
+ } else {
+ runTask(thread, std::move(task));
+ }
+
+ if (UNLIKELY(threadsToStop_ > 0 && !isJoin_)) {
+ if (--threadsToStop_ >= 0) {
+ stoppedThreads_.add(thread);
+ return;
+ } else {
+ threadsToStop_++;
+ }
+ }
+ }
+}
+
+void CPUThreadPoolExecutor::stopThreads(size_t n) {
+ CHECK(stoppedThreads_.size() == 0);
+ threadsToStop_ = n;
+ for (size_t i = 0; i < n; i++) {
+ taskQueue_->addWithPriority(CPUTask(), Executor::LO_PRI);
+ }
+}
+
+uint64_t CPUThreadPoolExecutor::getPendingTaskCount() {
+ return taskQueue_->size();
+}
+
+}} // folly::wangle