Migrate FunctionScheduler from common/concurrency/ to folly/experimental/
[folly.git] / folly / experimental / FunctionScheduler.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #ifndef FOLLY_EXPERIMENTAL_FUNCTION_SCHEDULER_H_
18 #define FOLLY_EXPERIMENTAL_FUNCTION_SCHEDULER_H_
19
20 #include <folly/Range.h>
21 #include <chrono>
22 #include <condition_variable>
23 #include <mutex>
24 #include <thread>
25 #include <vector>
26 #include <random>
27
28 namespace folly {
29
30 /**
31  * Schedules any number of functions to run at various intervals. E.g.,
32  *
33  *   FunctionScheduler fs;
34  *
35  *   fs.addFunction([&] { LOG(INFO) << "tick..."; }, seconds(1), "ticker");
36  *   fs.addFunction(std::bind(&TestClass::doStuff, this), minutes(5), "stuff");
37  *   fs.start();
38  *   ........
39  *   fs.cancelFunction("ticker");
40  *   fs.addFunction([&] { LOG(INFO) << "tock..."; }, minutes(3), "tocker");
41  *   ........
42  *   fs.shutdown();
43  *
44  *
45  * Note: the class uses only one thread - if you want to use more than one
46  *       thread use multiple FunctionScheduler objects
47  *
48  * start() schedules the functions, while shutdown() terminates further
49  * scheduling.
50  */
51 class FunctionScheduler {
52  public:
53   FunctionScheduler();
54   ~FunctionScheduler();
55
56   /**
57    * By default steady is false, meaning schedules may lag behind overtime.
58    * This could be due to long running tasks or time drift because of randomness
59    * in thread wakeup time.
60    * By setting steady to true, FunctionScheduler will attempt to catch up.
61    * i.e. more like a cronjob
62    *
63    * NOTE: it's only safe to set this before calling start()
64    */
65   void setSteady(bool steady) { steady_ = steady; }
66
67   struct LatencyDistribution {
68     bool isPoisson;
69     double poissonMean;
70
71     LatencyDistribution(bool poisson,
72                  double mean)
73       : isPoisson(poisson),
74         poissonMean(mean) {
75     }
76
77   };
78
79   /**
80    * Adds a new function to the FunctionScheduler.
81    *
82    * Functions will not be run until start() is called.  When start() is
83    * called, each function will be run after its specified startDelay.
84    * Functions may also be added after start() has been called, in which case
85    * startDelay is still honored.
86    *
87    * Throws an exception on error.  In particular, each function must have a
88    * unique name--two functions cannot be added with the same name.
89    */
90   void addFunction(const std::function<void()>& cb,
91                    std::chrono::milliseconds interval,
92                    StringPiece nameID = StringPiece(),
93                    std::chrono::milliseconds startDelay =
94                      std::chrono::milliseconds(0));
95
96   /**
97    * Cancels the function with the specified name, so it will no longer be run.
98    *
99    * Returns false if no function exists with the specified name.
100    */
101   bool cancelFunction(StringPiece nameID);
102
103   /**
104    * All functions registered will be canceled.
105    */
106   void cancelAllFunctions();
107
108   /**
109    * Starts the scheduler.
110    *
111    * Returns false if the scheduler was already running.
112    */
113   bool start();
114
115   /**
116    * Stops the FunctionScheduler.
117    *
118    * It may be restarted later by calling start() again.
119    */
120   void shutdown();
121
122   /**
123    * Set the name of the worker thread.
124    */
125   void setThreadName(StringPiece threadName);
126
127
128  private:
129   void addFunctionInternal(const std::function<void()>& cb,
130                    std::chrono::milliseconds interval,
131                    const LatencyDistribution& latencyDistr,
132                    StringPiece nameID = StringPiece(),
133                    std::chrono::milliseconds startDelay =
134                       std::chrono::milliseconds(0));
135   struct RepeatFunc {
136     std::function<void()> cb;
137     std::chrono::milliseconds timeInterval;
138     std::chrono::milliseconds lastRunTime;
139     std::string name;
140     std::chrono::milliseconds startDelay;
141     bool isPoissonDistr;
142     std::default_random_engine generator;
143     std::poisson_distribution<int> poisson_random;
144
145     RepeatFunc(const std::function<void()>& cback,
146                std::chrono::milliseconds interval,
147                const std::string& nameID,
148                std::chrono::milliseconds delay,
149                bool poisson = false,
150                double meanPoisson = 1.0)
151       : cb(cback),
152         timeInterval(interval),
153         lastRunTime(0),
154         name(nameID),
155         startDelay(delay),
156         isPoissonDistr(poisson),
157         poisson_random(meanPoisson) {
158     }
159
160     std::chrono::milliseconds getNextRunTime() const {
161       return lastRunTime + timeInterval;
162     }
163     void setNextRunTime(std::chrono::milliseconds time) {
164       lastRunTime = time - timeInterval;
165     }
166     void setTimeIntervalPoissonDistr() {
167       if (isPoissonDistr) {
168         timeInterval = std::chrono::milliseconds(poisson_random(generator));
169       }
170     }
171     void cancel() {
172       // Simply reset cb to an empty function.
173       cb = std::function<void()>();
174     }
175     bool isValid() const {
176       return bool(cb);
177     }
178   };
179   struct RunTimeOrder {
180     bool operator()(const RepeatFunc& f1, const RepeatFunc& f2) const {
181       return f1.getNextRunTime() > f2.getNextRunTime();
182     }
183   };
184   typedef std::vector<RepeatFunc> FunctionHeap;
185
186   void run();
187   void runOneFunction(std::unique_lock<std::mutex>& lock,
188                       std::chrono::milliseconds now);
189   void cancelFunction(const std::unique_lock<std::mutex> &lock,
190                       FunctionHeap::iterator it);
191
192   std::thread thread_;
193
194   // Mutex to protect our member variables.
195   std::mutex mutex_;
196   bool running_{false};
197
198   // The functions to run.
199   // This is a heap, ordered by next run time.
200   FunctionHeap functions_;
201   RunTimeOrder fnCmp_;
202
203   // The function currently being invoked by the running thread.
204   // This is null when the running thread is idle
205   RepeatFunc* currentFunction_{nullptr};
206
207   // Condition variable that is signalled whenever a new function is added
208   // or when the FunctionScheduler is stopped.
209   std::condition_variable runningCondvar_;
210
211   std::string threadName_;
212   bool steady_{false};
213 };
214
215 }
216
217 #endif