2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/executors/ThreadPoolExecutor.h>
24 * A Thread pool for CPU bound tasks.
26 * @note A single queue backed by folly/LifoSem and folly/MPMC queue.
27 * Because of this contention can be quite high,
28 * since all the worker threads and all the producer threads hit
29 * the same queue. MPMC queue excels in this situation but dictates a max queue
32 * @note The default queue throws when full (folly::QueueBehaviorIfFull::THROW),
33 * so add() can fail. Furthermore, join() can also fail if the queue is full,
34 * because it enqueues numThreads poison tasks to stop the threads. If join() is
35 * needed to be guaranteed to succeed PriorityLifoSemMPMCQueue can be used
36 * instead, initializing the lowest priority's (LO_PRI) capacity to at least
37 * numThreads. Poisons use LO_PRI so if that priority is not used for any user
38 * task join() is guaranteed not to encounter a full queue.
40 * @note If a blocking queue (folly::QueueBehaviorIfFull::BLOCK) is used, and
41 * tasks executing on a given thread pool schedule more tasks, deadlock is
42 * possible if the queue becomes full. Deadlock is also possible if there is
43 * a circular dependency among multiple thread pools with blocking queues.
44 * To avoid this situation, use non-blocking queue(s), or schedule tasks only
45 * from threads not belonging to the given thread pool(s), or use
46 * folly::IOThreadPoolExecutor.
48 * @note LifoSem wakes up threads in Lifo order - i.e. there are only few
49 * threads as necessary running, and we always try to reuse the same few threads
50 * for better cache locality.
51 * Inactive threads have their stack madvised away. This works quite well in
52 * combination with Lifosem - it almost doesn't matter if more threads than are
53 * necessary are specified at startup.
55 * @note stop() will finish all outstanding tasks at exit.
57 * @note Supports priorities - priorities are implemented as multiple queues -
58 * each worker thread checks the highest priority queue first. Threads
59 * themselves don't have priorities set, so a series of long running low
60 * priority tasks could still hog all the threads. (at last check pthreads
61 * thread priorities didn't work very well).
63 class CPUThreadPoolExecutor : public ThreadPoolExecutor {
67 CPUThreadPoolExecutor(
69 std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
70 std::shared_ptr<ThreadFactory> threadFactory =
71 std::make_shared<NamedThreadFactory>("CPUThreadPool"));
73 explicit CPUThreadPoolExecutor(size_t numThreads);
75 CPUThreadPoolExecutor(
77 std::shared_ptr<ThreadFactory> threadFactory);
79 CPUThreadPoolExecutor(
82 std::shared_ptr<ThreadFactory> threadFactory =
83 std::make_shared<NamedThreadFactory>("CPUThreadPool"));
85 CPUThreadPoolExecutor(
89 std::shared_ptr<ThreadFactory> threadFactory =
90 std::make_shared<NamedThreadFactory>("CPUThreadPool"));
92 ~CPUThreadPoolExecutor() override;
94 void add(Func func) override;
97 std::chrono::milliseconds expiration,
98 Func expireCallback = nullptr) override;
100 void addWithPriority(Func func, int8_t priority) override;
104 std::chrono::milliseconds expiration,
105 Func expireCallback = nullptr);
107 uint8_t getNumPriorities() const override;
109 struct CPUTask : public ThreadPoolExecutor::Task {
110 // Must be noexcept move constructible so it can be used in MPMCQueue
114 std::chrono::milliseconds expiration,
115 Func&& expireCallback)
116 : Task(std::move(f), expiration, std::move(expireCallback)),
119 : Task(nullptr, std::chrono::milliseconds(0), nullptr), poison(true) {}
124 static const size_t kDefaultMaxQueueSize;
127 BlockingQueue<CPUTask>* getTaskQueue();
130 void threadRun(ThreadPtr thread) override;
131 void stopThreads(size_t n) override;
132 uint64_t getPendingTaskCountImpl(const RWSpinLock::ReadHolder&) override;
134 std::unique_ptr<BlockingQueue<CPUTask>> taskQueue_;
135 std::atomic<ssize_t> threadsToStop_{0};