2017
[folly.git] / folly / experimental / FunctionScheduler.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/Function.h>
20 #include <folly/Range.h>
21 #include <chrono>
22 #include <condition_variable>
23 #include <mutex>
24 #include <thread>
25 #include <vector>
26
27 namespace folly {
28
29 /**
30  * Schedules any number of functions to run at various intervals. E.g.,
31  *
32  *   FunctionScheduler fs;
33  *
34  *   fs.addFunction([&] { LOG(INFO) << "tick..."; }, seconds(1), "ticker");
35  *   fs.addFunction(std::bind(&TestClass::doStuff, this), minutes(5), "stuff");
36  *   fs.start();
37  *   ........
38  *   fs.cancelFunction("ticker");
39  *   fs.addFunction([&] { LOG(INFO) << "tock..."; }, minutes(3), "tocker");
40  *   ........
41  *   fs.shutdown();
42  *
43  *
44  * Note: the class uses only one thread - if you want to use more than one
45  *       thread use multiple FunctionScheduler objects
46  *
47  * start() schedules the functions, while shutdown() terminates further
48  * scheduling.
49  */
50 class FunctionScheduler {
51  public:
52   FunctionScheduler();
53   ~FunctionScheduler();
54
55   /**
56    * By default steady is false, meaning schedules may lag behind overtime.
57    * This could be due to long running tasks or time drift because of randomness
58    * in thread wakeup time.
59    * By setting steady to true, FunctionScheduler will attempt to catch up.
60    * i.e. more like a cronjob
61    *
62    * NOTE: it's only safe to set this before calling start()
63    */
64   void setSteady(bool steady) { steady_ = steady; }
65
66   /*
67    * Parameters to control the function interval.
68    *
69    * If isPoisson is true, then use std::poisson_distribution to pick the
70    * interval between each invocation of the function.
71    *
72    * If isPoisson os false, then always use fixed the interval specified to
73    * addFunction().
74    */
75   struct LatencyDistribution {
76     bool isPoisson;
77     double poissonMean;
78
79     LatencyDistribution(bool poisson, double mean)
80       : isPoisson(poisson),
81         poissonMean(mean) {
82     }
83   };
84
85   /**
86    * Adds a new function to the FunctionScheduler.
87    *
88    * Functions will not be run until start() is called.  When start() is
89    * called, each function will be run after its specified startDelay.
90    * Functions may also be added after start() has been called, in which case
91    * startDelay is still honored.
92    *
93    * Throws an exception on error.  In particular, each function must have a
94    * unique name--two functions cannot be added with the same name.
95    */
96   void addFunction(Function<void()>&& cb,
97                    std::chrono::milliseconds interval,
98                    StringPiece nameID = StringPiece(),
99                    std::chrono::milliseconds startDelay =
100                      std::chrono::milliseconds(0));
101
102   /*
103    * Add a new function to the FunctionScheduler with a specified
104    * LatencyDistribution
105    */
106   void addFunction(
107       Function<void()>&& cb,
108       std::chrono::milliseconds interval,
109       const LatencyDistribution& latencyDistr,
110       StringPiece nameID = StringPiece(),
111       std::chrono::milliseconds startDelay = std::chrono::milliseconds(0));
112
113   /**
114    * Adds a new function to the FunctionScheduler to run only once.
115    */
116   void addFunctionOnce(
117       Function<void()>&& cb,
118       StringPiece nameID = StringPiece(),
119       std::chrono::milliseconds startDelay = std::chrono::milliseconds(0));
120
121   /**
122     * Add a new function to the FunctionScheduler with the time
123     * interval being distributed uniformly within the given interval
124     * [minInterval, maxInterval].
125     */
126   void addFunctionUniformDistribution(Function<void()>&& cb,
127                                       std::chrono::milliseconds minInterval,
128                                       std::chrono::milliseconds maxInterval,
129                                       StringPiece nameID,
130                                       std::chrono::milliseconds startDelay);
131
132   /**
133    * A type alias for function that is called to determine the time
134    * interval for the next scheduled run.
135    */
136   using IntervalDistributionFunc = Function<std::chrono::milliseconds()>;
137
138   /**
139    * Add a new function to the FunctionScheduler. The scheduling interval
140    * is determined by the interval distribution functor, which is called
141    * every time the next function execution is scheduled. This allows
142    * for supporting custom interval distribution algorithms in addition
143    * to built in constant interval; and Poisson and jitter distributions
144    * (@see FunctionScheduler::addFunction and
145    * @see FunctionScheduler::addFunctionJitterInterval).
146    */
147   void addFunctionGenericDistribution(
148       Function<void()>&& cb,
149       IntervalDistributionFunc&& intervalFunc,
150       const std::string& nameID,
151       const std::string& intervalDescr,
152       std::chrono::milliseconds startDelay);
153
154   /**
155    * Cancels the function with the specified name, so it will no longer be run.
156    *
157    * Returns false if no function exists with the specified name.
158    */
159   bool cancelFunction(StringPiece nameID);
160   bool cancelFunctionAndWait(StringPiece nameID);
161
162   /**
163    * All functions registered will be canceled.
164    */
165   void cancelAllFunctions();
166   void cancelAllFunctionsAndWait();
167
168   /**
169    * Resets the specified function's timer.
170    * When resetFunctionTimer is called, the specified function's timer will
171    * be reset with the same parameters it was passed initially, including
172    * its startDelay. If the startDelay was 0, the function will be invoked
173    * immediately.
174    *
175    * Returns false if no function exists with the specified name.
176    */
177   bool resetFunctionTimer(StringPiece nameID);
178
179   /**
180    * Starts the scheduler.
181    *
182    * Returns false if the scheduler was already running.
183    */
184   bool start();
185
186   /**
187    * Stops the FunctionScheduler.
188    *
189    * It may be restarted later by calling start() again.
190    * Returns false if the scheduler was not running.
191    */
192   bool shutdown();
193
194   /**
195    * Set the name of the worker thread.
196    */
197   void setThreadName(StringPiece threadName);
198
199  private:
200   struct RepeatFunc {
201     Function<void()> cb;
202     IntervalDistributionFunc intervalFunc;
203     std::chrono::steady_clock::time_point nextRunTime;
204     std::string name;
205     std::chrono::milliseconds startDelay;
206     std::string intervalDescr;
207     bool runOnce;
208
209     RepeatFunc(
210         Function<void()>&& cback,
211         IntervalDistributionFunc&& intervalFn,
212         const std::string& nameID,
213         const std::string& intervalDistDescription,
214         std::chrono::milliseconds delay,
215         bool once)
216         : cb(std::move(cback)),
217           intervalFunc(std::move(intervalFn)),
218           nextRunTime(),
219           name(nameID),
220           startDelay(delay),
221           intervalDescr(intervalDistDescription),
222           runOnce(once) {}
223
224     std::chrono::steady_clock::time_point getNextRunTime() const {
225       return nextRunTime;
226     }
227     void setNextRunTimeStrict(std::chrono::steady_clock::time_point curTime) {
228       nextRunTime = curTime + intervalFunc();
229     }
230     void setNextRunTimeSteady() { nextRunTime += intervalFunc(); }
231     void resetNextRunTime(std::chrono::steady_clock::time_point curTime) {
232       nextRunTime = curTime + startDelay;
233     }
234     void cancel() {
235       // Simply reset cb to an empty function.
236       cb = {};
237     }
238     bool isValid() const { return bool(cb); }
239   };
240
241   struct RunTimeOrder {
242     bool operator()(const RepeatFunc& f1, const RepeatFunc& f2) const {
243       return f1.getNextRunTime() > f2.getNextRunTime();
244     }
245   };
246
247   typedef std::vector<RepeatFunc> FunctionHeap;
248
249   void run();
250   void runOneFunction(std::unique_lock<std::mutex>& lock,
251                       std::chrono::steady_clock::time_point now);
252   void cancelFunction(const std::unique_lock<std::mutex>& lock,
253                       FunctionHeap::iterator it);
254   void addFunctionToHeap(const std::unique_lock<std::mutex>& lock,
255                          RepeatFunc&& func);
256
257   void addFunctionInternal(
258       Function<void()>&& cb,
259       IntervalDistributionFunc&& intervalFunc,
260       const std::string& nameID,
261       const std::string& intervalDescr,
262       std::chrono::milliseconds startDelay,
263       bool runOnce);
264
265   std::thread thread_;
266
267   // Mutex to protect our member variables.
268   std::mutex mutex_;
269   bool running_{false};
270
271   // The functions to run.
272   // This is a heap, ordered by next run time.
273   FunctionHeap functions_;
274   RunTimeOrder fnCmp_;
275
276   // The function currently being invoked by the running thread.
277   // This is null when the running thread is idle
278   RepeatFunc* currentFunction_{nullptr};
279
280   // Condition variable that is signalled whenever a new function is added
281   // or when the FunctionScheduler is stopped.
282   std::condition_variable runningCondvar_;
283
284   std::string threadName_;
285   bool steady_{false};
286 };
287
288 }