expose FunctionScheduler::addFunction() with a custom distribution
[folly.git] / folly / experimental / FunctionScheduler.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef FOLLY_EXPERIMENTAL_FUNCTION_SCHEDULER_H_
18 #define FOLLY_EXPERIMENTAL_FUNCTION_SCHEDULER_H_
19
20 #include <folly/Range.h>
21 #include <chrono>
22 #include <condition_variable>
23 #include <mutex>
24 #include <thread>
25 #include <vector>
26 #include <random>
27
28 namespace folly {
29
30 /**
31  * Schedules any number of functions to run at various intervals. E.g.,
32  *
33  *   FunctionScheduler fs;
34  *
35  *   fs.addFunction([&] { LOG(INFO) << "tick..."; }, seconds(1), "ticker");
36  *   fs.addFunction(std::bind(&TestClass::doStuff, this), minutes(5), "stuff");
37  *   fs.start();
38  *   ........
39  *   fs.cancelFunction("ticker");
40  *   fs.addFunction([&] { LOG(INFO) << "tock..."; }, minutes(3), "tocker");
41  *   ........
42  *   fs.shutdown();
43  *
44  *
45  * Note: the class uses only one thread - if you want to use more than one
46  *       thread use multiple FunctionScheduler objects
47  *
48  * start() schedules the functions, while shutdown() terminates further
49  * scheduling.
50  */
51 class FunctionScheduler {
52  public:
53   FunctionScheduler();
54   ~FunctionScheduler();
55
56   /**
57    * By default steady is false, meaning schedules may lag behind overtime.
58    * This could be due to long running tasks or time drift because of randomness
59    * in thread wakeup time.
60    * By setting steady to true, FunctionScheduler will attempt to catch up.
61    * i.e. more like a cronjob
62    *
63    * NOTE: it's only safe to set this before calling start()
64    */
65   void setSteady(bool steady) { steady_ = steady; }
66
67   /*
68    * Parameters to control the function interval.
69    *
70    * If isPoisson is true, then use std::poisson_distribution to pick the
71    * interval between each invocation of the function.
72    *
73    * If isPoisson os false, then always use fixed the interval specified to
74    * addFunction().
75    */
76   struct LatencyDistribution {
77     bool isPoisson;
78     double poissonMean;
79
80     LatencyDistribution(bool poisson, double mean)
81       : isPoisson(poisson),
82         poissonMean(mean) {
83     }
84   };
85
86   /**
87    * Adds a new function to the FunctionScheduler.
88    *
89    * Functions will not be run until start() is called.  When start() is
90    * called, each function will be run after its specified startDelay.
91    * Functions may also be added after start() has been called, in which case
92    * startDelay is still honored.
93    *
94    * Throws an exception on error.  In particular, each function must have a
95    * unique name--two functions cannot be added with the same name.
96    */
97   void addFunction(const std::function<void()>& cb,
98                    std::chrono::milliseconds interval,
99                    StringPiece nameID = StringPiece(),
100                    std::chrono::milliseconds startDelay =
101                      std::chrono::milliseconds(0));
102
103   /*
104    * Add a new function to the FunctionScheduler with a specified
105    * LatencyDistribution
106    */
107   void addFunction(const std::function<void()>& cb,
108                    std::chrono::milliseconds interval,
109                    const LatencyDistribution& latencyDistr,
110                    StringPiece nameID = StringPiece(),
111                    std::chrono::milliseconds startDelay =
112                       std::chrono::milliseconds(0));
113
114   /**
115    * Cancels the function with the specified name, so it will no longer be run.
116    *
117    * Returns false if no function exists with the specified name.
118    */
119   bool cancelFunction(StringPiece nameID);
120
121   /**
122    * All functions registered will be canceled.
123    */
124   void cancelAllFunctions();
125
126   /**
127    * Starts the scheduler.
128    *
129    * Returns false if the scheduler was already running.
130    */
131   bool start();
132
133   /**
134    * Stops the FunctionScheduler.
135    *
136    * It may be restarted later by calling start() again.
137    */
138   void shutdown();
139
140   /**
141    * Set the name of the worker thread.
142    */
143   void setThreadName(StringPiece threadName);
144
145
146  private:
147   struct RepeatFunc {
148     std::function<void()> cb;
149     std::chrono::milliseconds timeInterval;
150     std::chrono::steady_clock::time_point lastRunTime;
151     std::string name;
152     std::chrono::milliseconds startDelay;
153     bool isPoissonDistr;
154     std::default_random_engine generator;
155     std::poisson_distribution<int> poisson_random;
156
157     RepeatFunc(const std::function<void()>& cback,
158                std::chrono::milliseconds interval,
159                const std::string& nameID,
160                std::chrono::milliseconds delay,
161                bool poisson = false,
162                double meanPoisson = 1.0)
163       : cb(cback),
164         timeInterval(interval),
165         lastRunTime(),
166         name(nameID),
167         startDelay(delay),
168         isPoissonDistr(poisson),
169         poisson_random(meanPoisson) {
170     }
171
172     std::chrono::steady_clock::time_point getNextRunTime() const {
173       return lastRunTime + timeInterval;
174     }
175     void setNextRunTime(std::chrono::steady_clock::time_point time) {
176       lastRunTime = time - timeInterval;
177     }
178     void setTimeIntervalPoissonDistr() {
179       if (isPoissonDistr) {
180         timeInterval = std::chrono::milliseconds(poisson_random(generator));
181       }
182     }
183     void cancel() {
184       // Simply reset cb to an empty function.
185       cb = std::function<void()>();
186     }
187     bool isValid() const {
188       return bool(cb);
189     }
190   };
191   struct RunTimeOrder {
192     bool operator()(const RepeatFunc& f1, const RepeatFunc& f2) const {
193       return f1.getNextRunTime() > f2.getNextRunTime();
194     }
195   };
196   typedef std::vector<RepeatFunc> FunctionHeap;
197
198   void run();
199   void runOneFunction(std::unique_lock<std::mutex>& lock,
200                       std::chrono::steady_clock::time_point now);
201   void cancelFunction(const std::unique_lock<std::mutex> &lock,
202                       FunctionHeap::iterator it);
203
204   std::thread thread_;
205
206   // Mutex to protect our member variables.
207   std::mutex mutex_;
208   bool running_{false};
209
210   // The functions to run.
211   // This is a heap, ordered by next run time.
212   FunctionHeap functions_;
213   RunTimeOrder fnCmp_;
214
215   // The function currently being invoked by the running thread.
216   // This is null when the running thread is idle
217   RepeatFunc* currentFunction_{nullptr};
218
219   // Condition variable that is signalled whenever a new function is added
220   // or when the FunctionScheduler is stopped.
221   std::condition_variable runningCondvar_;
222
223   std::string threadName_;
224   bool steady_{false};
225 };
226
227 }
228
229 #endif