Fix copyright lines
[folly.git] / folly / executors / CPUThreadPoolExecutor.h
1 /*
2  * Copyright 2017-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/executors/ThreadPoolExecutor.h>
20
21 namespace folly {
22
23 /**
24  * A Thread pool for CPU bound tasks.
25  *
26  * @note A single queue backed by folly/LifoSem and folly/MPMC queue.
27  * Because of this contention can be quite high,
28  * since all the worker threads and all the producer threads hit
29  * the same queue. MPMC queue excels in this situation but dictates a max queue
30  * size.
31  *
32  * @note The default queue throws when full (folly::QueueBehaviorIfFull::THROW),
33  * so add() can fail. Furthermore, join() can also fail if the queue is full,
34  * because it enqueues numThreads poison tasks to stop the threads. If join() is
35  * needed to be guaranteed to succeed PriorityLifoSemMPMCQueue can be used
36  * instead, initializing the lowest priority's (LO_PRI) capacity to at least
37  * numThreads. Poisons use LO_PRI so if that priority is not used for any user
38  * task join() is guaranteed not to encounter a full queue.
39  *
40  * @note If a blocking queue (folly::QueueBehaviorIfFull::BLOCK) is used, and
41  * tasks executing on a given thread pool schedule more tasks, deadlock is
42  * possible if the queue becomes full.  Deadlock is also possible if there is
43  * a circular dependency among multiple thread pools with blocking queues.
44  * To avoid this situation, use non-blocking queue(s), or schedule tasks only
45  * from threads not belonging to the given thread pool(s), or use
46  * folly::IOThreadPoolExecutor.
47  *
48  * @note LifoSem wakes up threads in Lifo order - i.e. there are only few
49  * threads as necessary running, and we always try to reuse the same few threads
50  * for better cache locality.
51  * Inactive threads have their stack madvised away. This works quite well in
52  * combination with Lifosem - it almost doesn't matter if more threads than are
53  * necessary are specified at startup.
54  *
55  * @note stop() will finish all outstanding tasks at exit.
56  *
57  * @note Supports priorities - priorities are implemented as multiple queues -
58  * each worker thread checks the highest priority queue first. Threads
59  * themselves don't have priorities set, so a series of long running low
60  * priority tasks could still hog all the threads. (at last check pthreads
61  * thread priorities didn't work very well).
62  */
63 class CPUThreadPoolExecutor : public ThreadPoolExecutor {
64  public:
65   struct CPUTask;
66
67   CPUThreadPoolExecutor(
68       size_t numThreads,
69       std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
70       std::shared_ptr<ThreadFactory> threadFactory =
71           std::make_shared<NamedThreadFactory>("CPUThreadPool"));
72
73   explicit CPUThreadPoolExecutor(size_t numThreads);
74
75   CPUThreadPoolExecutor(
76       size_t numThreads,
77       std::shared_ptr<ThreadFactory> threadFactory);
78
79   CPUThreadPoolExecutor(
80       size_t numThreads,
81       int8_t numPriorities,
82       std::shared_ptr<ThreadFactory> threadFactory =
83           std::make_shared<NamedThreadFactory>("CPUThreadPool"));
84
85   CPUThreadPoolExecutor(
86       size_t numThreads,
87       int8_t numPriorities,
88       size_t maxQueueSize,
89       std::shared_ptr<ThreadFactory> threadFactory =
90           std::make_shared<NamedThreadFactory>("CPUThreadPool"));
91
92   ~CPUThreadPoolExecutor() override;
93
94   void add(Func func) override;
95   void add(
96       Func func,
97       std::chrono::milliseconds expiration,
98       Func expireCallback = nullptr) override;
99
100   void addWithPriority(Func func, int8_t priority) override;
101   void add(
102       Func func,
103       int8_t priority,
104       std::chrono::milliseconds expiration,
105       Func expireCallback = nullptr);
106
107   uint8_t getNumPriorities() const override;
108
109   struct CPUTask : public ThreadPoolExecutor::Task {
110     // Must be noexcept move constructible so it can be used in MPMCQueue
111
112     explicit CPUTask(
113         Func&& f,
114         std::chrono::milliseconds expiration,
115         Func&& expireCallback)
116         : Task(std::move(f), expiration, std::move(expireCallback)),
117           poison(false) {}
118     CPUTask()
119         : Task(nullptr, std::chrono::milliseconds(0), nullptr), poison(true) {}
120
121     bool poison;
122   };
123
124   static const size_t kDefaultMaxQueueSize;
125
126  protected:
127   BlockingQueue<CPUTask>* getTaskQueue();
128
129  private:
130   void threadRun(ThreadPtr thread) override;
131   void stopThreads(size_t n) override;
132   uint64_t getPendingTaskCountImpl(const RWSpinLock::ReadHolder&) override;
133
134   std::unique_ptr<BlockingQueue<CPUTask>> taskQueue_;
135   std::atomic<ssize_t> threadsToStop_{0};
136 };
137
138 } // namespace folly