move wangle/concurrent to folly/executors
[folly.git] / folly / executors / CPUThreadPoolExecutor.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/executors/ThreadPoolExecutor.h>
20
21 namespace folly {
22
23 /**
24  * A Thread pool for CPU bound tasks.
25  *
26  * @note A single queue backed by folly/LifoSem and folly/MPMC queue.
27  * Because of this contention can be quite high,
28  * since all the worker threads and all the producer threads hit
29  * the same queue. MPMC queue excels in this situation but dictates a max queue
30  * size.
31  *
32  * @note If a blocking queue (folly::QueueBehaviorIfFull::BLOCK) is used, and
33  * tasks executing on a given thread pool schedule more tasks, deadlock is
34  * possible if the queue becomes full.  Deadlock is also possible if there is
35  * a circular dependency among multiple thread pools with blocking queues.
36  * To avoid this situation, use non-blocking queue(s), or schedule tasks only
37  * from threads not belonging to the given thread pool(s), or use
38  * folly::IOThreadPoolExecutor.
39  *
40  * @note LifoSem wakes up threads in Lifo order - i.e. there are only few
41  * threads as necessary running, and we always try to reuse the same few threads
42  * for better cache locality.
43  * Inactive threads have their stack madvised away. This works quite well in
44  * combination with Lifosem - it almost doesn't matter if more threads than are
45  * necessary are specified at startup.
46  *
47  * @note stop() will finish all outstanding tasks at exit.
48  *
49  * @note Supports priorities - priorities are implemented as multiple queues -
50  * each worker thread checks the highest priority queue first. Threads
51  * themselves don't have priorities set, so a series of long running low
52  * priority tasks could still hog all the threads. (at last check pthreads
53  * thread priorities didn't work very well).
54  */
55 class CPUThreadPoolExecutor : public ThreadPoolExecutor {
56  public:
57   struct CPUTask;
58
59   CPUThreadPoolExecutor(
60       size_t numThreads,
61       std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
62       std::shared_ptr<ThreadFactory> threadFactory =
63           std::make_shared<NamedThreadFactory>("CPUThreadPool"));
64
65   explicit CPUThreadPoolExecutor(size_t numThreads);
66
67   CPUThreadPoolExecutor(
68       size_t numThreads,
69       std::shared_ptr<ThreadFactory> threadFactory);
70
71   CPUThreadPoolExecutor(
72       size_t numThreads,
73       int8_t numPriorities,
74       std::shared_ptr<ThreadFactory> threadFactory =
75           std::make_shared<NamedThreadFactory>("CPUThreadPool"));
76
77   CPUThreadPoolExecutor(
78       size_t numThreads,
79       int8_t numPriorities,
80       size_t maxQueueSize,
81       std::shared_ptr<ThreadFactory> threadFactory =
82           std::make_shared<NamedThreadFactory>("CPUThreadPool"));
83
84   ~CPUThreadPoolExecutor() override;
85
86   void add(Func func) override;
87   void add(
88       Func func,
89       std::chrono::milliseconds expiration,
90       Func expireCallback = nullptr) override;
91
92   void addWithPriority(Func func, int8_t priority) override;
93   void add(
94       Func func,
95       int8_t priority,
96       std::chrono::milliseconds expiration,
97       Func expireCallback = nullptr);
98
99   uint8_t getNumPriorities() const override;
100
101   struct CPUTask : public ThreadPoolExecutor::Task {
102     // Must be noexcept move constructible so it can be used in MPMCQueue
103
104     explicit CPUTask(
105         Func&& f,
106         std::chrono::milliseconds expiration,
107         Func&& expireCallback)
108         : Task(std::move(f), expiration, std::move(expireCallback)),
109           poison(false) {}
110     CPUTask()
111         : Task(nullptr, std::chrono::milliseconds(0), nullptr), poison(true) {}
112
113     bool poison;
114   };
115
116   static const size_t kDefaultMaxQueueSize;
117
118  protected:
119   BlockingQueue<CPUTask>* getTaskQueue();
120
121  private:
122   void threadRun(ThreadPtr thread) override;
123   void stopThreads(size_t n) override;
124   uint64_t getPendingTaskCountImpl(const RWSpinLock::ReadHolder&) override;
125
126   std::unique_ptr<BlockingQueue<CPUTask>> taskQueue_;
127   std::atomic<ssize_t> threadsToStop_{0};
128 };
129
130 } // namespace folly