Revert D6050464: [Folly] Move folly/Hash.h to folly/hash/
[folly.git] / folly / experimental / FunctionScheduler.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/Function.h>
20 #include <folly/Range.h>
21 #include <folly/Hash.h>
22 #include <chrono>
23 #include <condition_variable>
24 #include <mutex>
25 #include <thread>
26 #include <vector>
27 #include <unordered_map>
28
29 namespace folly {
30
31 /**
32  * Schedules any number of functions to run at various intervals. E.g.,
33  *
34  *   FunctionScheduler fs;
35  *
36  *   fs.addFunction([&] { LOG(INFO) << "tick..."; }, seconds(1), "ticker");
37  *   fs.addFunction(std::bind(&TestClass::doStuff, this), minutes(5), "stuff");
38  *   fs.start();
39  *   ........
40  *   fs.cancelFunction("ticker");
41  *   fs.addFunction([&] { LOG(INFO) << "tock..."; }, minutes(3), "tocker");
42  *   ........
43  *   fs.shutdown();
44  *
45  *
46  * Note: the class uses only one thread - if you want to use more than one
47  *       thread, either use multiple FunctionScheduler objects, or check out
48  *       ThreadedRepeatingFunctionRunner.h for a much simpler contract of
49  *       "run each function periodically in its own thread".
50  *
51  * start() schedules the functions, while shutdown() terminates further
52  * scheduling.
53  */
54 class FunctionScheduler {
55  public:
56   FunctionScheduler();
57   ~FunctionScheduler();
58
59   /**
60    * By default steady is false, meaning schedules may lag behind overtime.
61    * This could be due to long running tasks or time drift because of randomness
62    * in thread wakeup time.
63    * By setting steady to true, FunctionScheduler will attempt to catch up.
64    * i.e. more like a cronjob
65    *
66    * NOTE: it's only safe to set this before calling start()
67    */
68   void setSteady(bool steady) { steady_ = steady; }
69
70   /*
71    * Parameters to control the function interval.
72    *
73    * If isPoisson is true, then use std::poisson_distribution to pick the
74    * interval between each invocation of the function.
75    *
76    * If isPoisson os false, then always use fixed the interval specified to
77    * addFunction().
78    */
79   struct LatencyDistribution {
80     bool isPoisson;
81     double poissonMean;
82
83     LatencyDistribution(bool poisson, double mean)
84       : isPoisson(poisson),
85         poissonMean(mean) {
86     }
87   };
88
89   /**
90    * Adds a new function to the FunctionScheduler.
91    *
92    * Functions will not be run until start() is called.  When start() is
93    * called, each function will be run after its specified startDelay.
94    * Functions may also be added after start() has been called, in which case
95    * startDelay is still honored.
96    *
97    * Throws an exception on error.  In particular, each function must have a
98    * unique name--two functions cannot be added with the same name.
99    */
100   void addFunction(Function<void()>&& cb,
101                    std::chrono::milliseconds interval,
102                    StringPiece nameID = StringPiece(),
103                    std::chrono::milliseconds startDelay =
104                      std::chrono::milliseconds(0));
105
106   /*
107    * Add a new function to the FunctionScheduler with a specified
108    * LatencyDistribution
109    */
110   void addFunction(
111       Function<void()>&& cb,
112       std::chrono::milliseconds interval,
113       const LatencyDistribution& latencyDistr,
114       StringPiece nameID = StringPiece(),
115       std::chrono::milliseconds startDelay = std::chrono::milliseconds(0));
116
117   /**
118    * Adds a new function to the FunctionScheduler to run only once.
119    */
120   void addFunctionOnce(
121       Function<void()>&& cb,
122       StringPiece nameID = StringPiece(),
123       std::chrono::milliseconds startDelay = std::chrono::milliseconds(0));
124
125   /**
126     * Add a new function to the FunctionScheduler with the time
127     * interval being distributed uniformly within the given interval
128     * [minInterval, maxInterval].
129     */
130   void addFunctionUniformDistribution(Function<void()>&& cb,
131                                       std::chrono::milliseconds minInterval,
132                                       std::chrono::milliseconds maxInterval,
133                                       StringPiece nameID,
134                                       std::chrono::milliseconds startDelay);
135
136   /**
137    * A type alias for function that is called to determine the time
138    * interval for the next scheduled run.
139    */
140   using IntervalDistributionFunc = Function<std::chrono::milliseconds()>;
141
142   /**
143    * Add a new function to the FunctionScheduler. The scheduling interval
144    * is determined by the interval distribution functor, which is called
145    * every time the next function execution is scheduled. This allows
146    * for supporting custom interval distribution algorithms in addition
147    * to built in constant interval; and Poisson and jitter distributions
148    * (@see FunctionScheduler::addFunction and
149    * @see FunctionScheduler::addFunctionJitterInterval).
150    */
151   void addFunctionGenericDistribution(
152       Function<void()>&& cb,
153       IntervalDistributionFunc&& intervalFunc,
154       const std::string& nameID,
155       const std::string& intervalDescr,
156       std::chrono::milliseconds startDelay);
157
158   /**
159    * Cancels the function with the specified name, so it will no longer be run.
160    *
161    * Returns false if no function exists with the specified name.
162    */
163   bool cancelFunction(StringPiece nameID);
164   bool cancelFunctionAndWait(StringPiece nameID);
165
166   /**
167    * All functions registered will be canceled.
168    */
169   void cancelAllFunctions();
170   void cancelAllFunctionsAndWait();
171
172   /**
173    * Resets the specified function's timer.
174    * When resetFunctionTimer is called, the specified function's timer will
175    * be reset with the same parameters it was passed initially, including
176    * its startDelay. If the startDelay was 0, the function will be invoked
177    * immediately.
178    *
179    * Returns false if no function exists with the specified name.
180    */
181   bool resetFunctionTimer(StringPiece nameID);
182
183   /**
184    * Starts the scheduler.
185    *
186    * Returns false if the scheduler was already running.
187    */
188   bool start();
189
190   /**
191    * Stops the FunctionScheduler.
192    *
193    * It may be restarted later by calling start() again.
194    * Returns false if the scheduler was not running.
195    */
196   bool shutdown();
197
198   /**
199    * Set the name of the worker thread.
200    */
201   void setThreadName(StringPiece threadName);
202
203  private:
204   struct RepeatFunc {
205     Function<void()> cb;
206     IntervalDistributionFunc intervalFunc;
207     std::chrono::steady_clock::time_point nextRunTime;
208     std::string name;
209     std::chrono::milliseconds startDelay;
210     std::string intervalDescr;
211     bool runOnce;
212
213     RepeatFunc(
214         Function<void()>&& cback,
215         IntervalDistributionFunc&& intervalFn,
216         const std::string& nameID,
217         const std::string& intervalDistDescription,
218         std::chrono::milliseconds delay,
219         bool once)
220         : cb(std::move(cback)),
221           intervalFunc(std::move(intervalFn)),
222           nextRunTime(),
223           name(nameID),
224           startDelay(delay),
225           intervalDescr(intervalDistDescription),
226           runOnce(once) {}
227
228     std::chrono::steady_clock::time_point getNextRunTime() const {
229       return nextRunTime;
230     }
231     void setNextRunTimeStrict(std::chrono::steady_clock::time_point curTime) {
232       nextRunTime = curTime + intervalFunc();
233     }
234     void setNextRunTimeSteady() { nextRunTime += intervalFunc(); }
235     void resetNextRunTime(std::chrono::steady_clock::time_point curTime) {
236       nextRunTime = curTime + startDelay;
237     }
238     void cancel() {
239       // Simply reset cb to an empty function.
240       cb = {};
241     }
242     bool isValid() const { return bool(cb); }
243   };
244
245   struct RunTimeOrder {
246     bool operator()(const std::unique_ptr<RepeatFunc>& f1, const std::unique_ptr<RepeatFunc>& f2) const {
247       return f1->getNextRunTime() > f2->getNextRunTime();
248     }
249   };
250
251   typedef std::vector<std::unique_ptr<RepeatFunc>> FunctionHeap;
252   typedef std::unordered_map<StringPiece, RepeatFunc*, Hash> FunctionMap;
253
254   void run();
255   void runOneFunction(std::unique_lock<std::mutex>& lock,
256                       std::chrono::steady_clock::time_point now);
257   void cancelFunction(const std::unique_lock<std::mutex>& lock,
258                       RepeatFunc* it);
259   void addFunctionToHeap(const std::unique_lock<std::mutex>& lock,
260                          std::unique_ptr<RepeatFunc> func);
261
262   void addFunctionInternal(
263       Function<void()>&& cb,
264       IntervalDistributionFunc&& intervalFunc,
265       const std::string& nameID,
266       const std::string& intervalDescr,
267       std::chrono::milliseconds startDelay,
268       bool runOnce);
269
270   // Return true if the current function is being canceled
271   bool cancelAllFunctionsWithLock(std::unique_lock<std::mutex>& lock);
272   bool cancelFunctionWithLock(
273       std::unique_lock<std::mutex>& lock,
274       StringPiece nameID);
275
276   std::thread thread_;
277
278   // Mutex to protect our member variables.
279   std::mutex mutex_;
280   bool running_{false};
281
282   // The functions to run.
283   // This is a heap, ordered by next run time.
284   FunctionHeap functions_;
285   FunctionMap functionsMap_;
286   RunTimeOrder fnCmp_;
287
288   // The function currently being invoked by the running thread.
289   // This is null when the running thread is idle
290   RepeatFunc* currentFunction_{nullptr};
291
292   // Condition variable that is signalled whenever a new function is added
293   // or when the FunctionScheduler is stopped.
294   std::condition_variable runningCondvar_;
295
296   std::string threadName_;
297   bool steady_{false};
298   bool cancellingCurrentFunction_{false};
299 };
300
301 }