0c590842a0adb3698164f6604e31b53be1a11c1d
[folly.git] / folly / experimental / FunctionScheduler.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/Function.h>
20 #include <folly/Range.h>
21 #include <chrono>
22 #include <condition_variable>
23 #include <mutex>
24 #include <thread>
25 #include <vector>
26
27 namespace folly {
28
29 /**
30  * Schedules any number of functions to run at various intervals. E.g.,
31  *
32  *   FunctionScheduler fs;
33  *
34  *   fs.addFunction([&] { LOG(INFO) << "tick..."; }, seconds(1), "ticker");
35  *   fs.addFunction(std::bind(&TestClass::doStuff, this), minutes(5), "stuff");
36  *   fs.start();
37  *   ........
38  *   fs.cancelFunction("ticker");
39  *   fs.addFunction([&] { LOG(INFO) << "tock..."; }, minutes(3), "tocker");
40  *   ........
41  *   fs.shutdown();
42  *
43  *
44  * Note: the class uses only one thread - if you want to use more than one
45  *       thread, either use multiple FunctionScheduler objects, or check out
46  *       ThreadedRepeatingFunctionRunner.h for a much simpler contract of
47  *       "run each function periodically in its own thread".
48  *
49  * start() schedules the functions, while shutdown() terminates further
50  * scheduling.
51  */
52 class FunctionScheduler {
53  public:
54   FunctionScheduler();
55   ~FunctionScheduler();
56
57   /**
58    * By default steady is false, meaning schedules may lag behind overtime.
59    * This could be due to long running tasks or time drift because of randomness
60    * in thread wakeup time.
61    * By setting steady to true, FunctionScheduler will attempt to catch up.
62    * i.e. more like a cronjob
63    *
64    * NOTE: it's only safe to set this before calling start()
65    */
66   void setSteady(bool steady) { steady_ = steady; }
67
68   /*
69    * Parameters to control the function interval.
70    *
71    * If isPoisson is true, then use std::poisson_distribution to pick the
72    * interval between each invocation of the function.
73    *
74    * If isPoisson os false, then always use fixed the interval specified to
75    * addFunction().
76    */
77   struct LatencyDistribution {
78     bool isPoisson;
79     double poissonMean;
80
81     LatencyDistribution(bool poisson, double mean)
82       : isPoisson(poisson),
83         poissonMean(mean) {
84     }
85   };
86
87   /**
88    * Adds a new function to the FunctionScheduler.
89    *
90    * Functions will not be run until start() is called.  When start() is
91    * called, each function will be run after its specified startDelay.
92    * Functions may also be added after start() has been called, in which case
93    * startDelay is still honored.
94    *
95    * Throws an exception on error.  In particular, each function must have a
96    * unique name--two functions cannot be added with the same name.
97    */
98   void addFunction(Function<void()>&& cb,
99                    std::chrono::milliseconds interval,
100                    StringPiece nameID = StringPiece(),
101                    std::chrono::milliseconds startDelay =
102                      std::chrono::milliseconds(0));
103
104   /*
105    * Add a new function to the FunctionScheduler with a specified
106    * LatencyDistribution
107    */
108   void addFunction(
109       Function<void()>&& cb,
110       std::chrono::milliseconds interval,
111       const LatencyDistribution& latencyDistr,
112       StringPiece nameID = StringPiece(),
113       std::chrono::milliseconds startDelay = std::chrono::milliseconds(0));
114
115   /**
116    * Adds a new function to the FunctionScheduler to run only once.
117    */
118   void addFunctionOnce(
119       Function<void()>&& cb,
120       StringPiece nameID = StringPiece(),
121       std::chrono::milliseconds startDelay = std::chrono::milliseconds(0));
122
123   /**
124     * Add a new function to the FunctionScheduler with the time
125     * interval being distributed uniformly within the given interval
126     * [minInterval, maxInterval].
127     */
128   void addFunctionUniformDistribution(Function<void()>&& cb,
129                                       std::chrono::milliseconds minInterval,
130                                       std::chrono::milliseconds maxInterval,
131                                       StringPiece nameID,
132                                       std::chrono::milliseconds startDelay);
133
134   /**
135    * A type alias for function that is called to determine the time
136    * interval for the next scheduled run.
137    */
138   using IntervalDistributionFunc = Function<std::chrono::milliseconds()>;
139
140   /**
141    * Add a new function to the FunctionScheduler. The scheduling interval
142    * is determined by the interval distribution functor, which is called
143    * every time the next function execution is scheduled. This allows
144    * for supporting custom interval distribution algorithms in addition
145    * to built in constant interval; and Poisson and jitter distributions
146    * (@see FunctionScheduler::addFunction and
147    * @see FunctionScheduler::addFunctionJitterInterval).
148    */
149   void addFunctionGenericDistribution(
150       Function<void()>&& cb,
151       IntervalDistributionFunc&& intervalFunc,
152       const std::string& nameID,
153       const std::string& intervalDescr,
154       std::chrono::milliseconds startDelay);
155
156   /**
157    * Cancels the function with the specified name, so it will no longer be run.
158    *
159    * Returns false if no function exists with the specified name.
160    */
161   bool cancelFunction(StringPiece nameID);
162   bool cancelFunctionAndWait(StringPiece nameID);
163
164   /**
165    * All functions registered will be canceled.
166    */
167   void cancelAllFunctions();
168   void cancelAllFunctionsAndWait();
169
170   /**
171    * Resets the specified function's timer.
172    * When resetFunctionTimer is called, the specified function's timer will
173    * be reset with the same parameters it was passed initially, including
174    * its startDelay. If the startDelay was 0, the function will be invoked
175    * immediately.
176    *
177    * Returns false if no function exists with the specified name.
178    */
179   bool resetFunctionTimer(StringPiece nameID);
180
181   /**
182    * Starts the scheduler.
183    *
184    * Returns false if the scheduler was already running.
185    */
186   bool start();
187
188   /**
189    * Stops the FunctionScheduler.
190    *
191    * It may be restarted later by calling start() again.
192    * Returns false if the scheduler was not running.
193    */
194   bool shutdown();
195
196   /**
197    * Set the name of the worker thread.
198    */
199   void setThreadName(StringPiece threadName);
200
201  private:
202   struct RepeatFunc {
203     Function<void()> cb;
204     IntervalDistributionFunc intervalFunc;
205     std::chrono::steady_clock::time_point nextRunTime;
206     std::string name;
207     std::chrono::milliseconds startDelay;
208     std::string intervalDescr;
209     bool runOnce;
210
211     RepeatFunc(
212         Function<void()>&& cback,
213         IntervalDistributionFunc&& intervalFn,
214         const std::string& nameID,
215         const std::string& intervalDistDescription,
216         std::chrono::milliseconds delay,
217         bool once)
218         : cb(std::move(cback)),
219           intervalFunc(std::move(intervalFn)),
220           nextRunTime(),
221           name(nameID),
222           startDelay(delay),
223           intervalDescr(intervalDistDescription),
224           runOnce(once) {}
225
226     std::chrono::steady_clock::time_point getNextRunTime() const {
227       return nextRunTime;
228     }
229     void setNextRunTimeStrict(std::chrono::steady_clock::time_point curTime) {
230       nextRunTime = curTime + intervalFunc();
231     }
232     void setNextRunTimeSteady() { nextRunTime += intervalFunc(); }
233     void resetNextRunTime(std::chrono::steady_clock::time_point curTime) {
234       nextRunTime = curTime + startDelay;
235     }
236     void cancel() {
237       // Simply reset cb to an empty function.
238       cb = {};
239     }
240     bool isValid() const { return bool(cb); }
241   };
242
243   struct RunTimeOrder {
244     bool operator()(const RepeatFunc& f1, const RepeatFunc& f2) const {
245       return f1.getNextRunTime() > f2.getNextRunTime();
246     }
247   };
248
249   typedef std::vector<RepeatFunc> FunctionHeap;
250
251   void run();
252   void runOneFunction(std::unique_lock<std::mutex>& lock,
253                       std::chrono::steady_clock::time_point now);
254   void cancelFunction(const std::unique_lock<std::mutex>& lock,
255                       FunctionHeap::iterator it);
256   void addFunctionToHeap(const std::unique_lock<std::mutex>& lock,
257                          RepeatFunc&& func);
258
259   void addFunctionInternal(
260       Function<void()>&& cb,
261       IntervalDistributionFunc&& intervalFunc,
262       const std::string& nameID,
263       const std::string& intervalDescr,
264       std::chrono::milliseconds startDelay,
265       bool runOnce);
266
267   std::thread thread_;
268
269   // Mutex to protect our member variables.
270   std::mutex mutex_;
271   bool running_{false};
272
273   // The functions to run.
274   // This is a heap, ordered by next run time.
275   FunctionHeap functions_;
276   RunTimeOrder fnCmp_;
277
278   // The function currently being invoked by the running thread.
279   // This is null when the running thread is idle
280   RepeatFunc* currentFunction_{nullptr};
281
282   // Condition variable that is signalled whenever a new function is added
283   // or when the FunctionScheduler is stopped.
284   std::condition_variable runningCondvar_;
285
286   std::string threadName_;
287   bool steady_{false};
288 };
289
290 }