2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/executors/ThreadPoolExecutor.h>
24 * A Thread pool for CPU bound tasks.
26 * @note A single queue backed by folly/LifoSem and folly/MPMC queue.
27 * Because of this contention can be quite high,
28 * since all the worker threads and all the producer threads hit
29 * the same queue. MPMC queue excels in this situation but dictates a max queue
32 * @note If a blocking queue (folly::QueueBehaviorIfFull::BLOCK) is used, and
33 * tasks executing on a given thread pool schedule more tasks, deadlock is
34 * possible if the queue becomes full. Deadlock is also possible if there is
35 * a circular dependency among multiple thread pools with blocking queues.
36 * To avoid this situation, use non-blocking queue(s), or schedule tasks only
37 * from threads not belonging to the given thread pool(s), or use
38 * folly::IOThreadPoolExecutor.
40 * @note LifoSem wakes up threads in Lifo order - i.e. there are only few
41 * threads as necessary running, and we always try to reuse the same few threads
42 * for better cache locality.
43 * Inactive threads have their stack madvised away. This works quite well in
44 * combination with Lifosem - it almost doesn't matter if more threads than are
45 * necessary are specified at startup.
47 * @note stop() will finish all outstanding tasks at exit.
49 * @note Supports priorities - priorities are implemented as multiple queues -
50 * each worker thread checks the highest priority queue first. Threads
51 * themselves don't have priorities set, so a series of long running low
52 * priority tasks could still hog all the threads. (at last check pthreads
53 * thread priorities didn't work very well).
55 class CPUThreadPoolExecutor : public ThreadPoolExecutor {
59 CPUThreadPoolExecutor(
61 std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
62 std::shared_ptr<ThreadFactory> threadFactory =
63 std::make_shared<NamedThreadFactory>("CPUThreadPool"));
65 explicit CPUThreadPoolExecutor(size_t numThreads);
67 CPUThreadPoolExecutor(
69 std::shared_ptr<ThreadFactory> threadFactory);
71 CPUThreadPoolExecutor(
74 std::shared_ptr<ThreadFactory> threadFactory =
75 std::make_shared<NamedThreadFactory>("CPUThreadPool"));
77 CPUThreadPoolExecutor(
81 std::shared_ptr<ThreadFactory> threadFactory =
82 std::make_shared<NamedThreadFactory>("CPUThreadPool"));
84 ~CPUThreadPoolExecutor() override;
86 void add(Func func) override;
89 std::chrono::milliseconds expiration,
90 Func expireCallback = nullptr) override;
92 void addWithPriority(Func func, int8_t priority) override;
96 std::chrono::milliseconds expiration,
97 Func expireCallback = nullptr);
99 uint8_t getNumPriorities() const override;
101 struct CPUTask : public ThreadPoolExecutor::Task {
102 // Must be noexcept move constructible so it can be used in MPMCQueue
106 std::chrono::milliseconds expiration,
107 Func&& expireCallback)
108 : Task(std::move(f), expiration, std::move(expireCallback)),
111 : Task(nullptr, std::chrono::milliseconds(0), nullptr), poison(true) {}
116 static const size_t kDefaultMaxQueueSize;
119 BlockingQueue<CPUTask>* getTaskQueue();
122 void threadRun(ThreadPtr thread) override;
123 void stopThreads(size_t n) override;
124 uint64_t getPendingTaskCountImpl(const RWSpinLock::ReadHolder&) override;
126 std::unique_ptr<BlockingQueue<CPUTask>> taskQueue_;
127 std::atomic<ssize_t> threadsToStop_{0};