folly::FunctionScheduler: Adding support for uniform interval distribution
[folly.git] / folly / experimental / FunctionScheduler.cpp
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/experimental/FunctionScheduler.h>
18
19 #include <random>
20
21 #include <folly/Conv.h>
22 #include <folly/Random.h>
23 #include <folly/String.h>
24 #include <folly/ThreadName.h>
25
26 using std::chrono::milliseconds;
27 using std::chrono::steady_clock;
28
29 namespace folly {
30
31 namespace {
32
33 struct ConstIntervalFunctor {
34   const milliseconds constInterval;
35
36   explicit ConstIntervalFunctor(milliseconds interval)
37       : constInterval(interval) {
38     if (interval < milliseconds::zero()) {
39       throw std::invalid_argument(
40           "FunctionScheduler: "
41           "time interval must be non-negative");
42     }
43   }
44
45   milliseconds operator()() const { return constInterval; }
46 };
47
48 struct PoissonDistributionFunctor {
49   std::default_random_engine generator;
50   std::poisson_distribution<int> poissonRandom;
51
52   explicit PoissonDistributionFunctor(double meanPoissonMs)
53       : poissonRandom(meanPoissonMs) {
54     if (meanPoissonMs < 0.0) {
55       throw std::invalid_argument(
56           "FunctionScheduler: "
57           "Poisson mean interval must be non-negative");
58     }
59   }
60
61   milliseconds operator()() { return milliseconds(poissonRandom(generator)); }
62 };
63
64 struct UniformDistributionFunctor {
65   std::default_random_engine generator;
66   std::uniform_int_distribution<> dist;
67
68   UniformDistributionFunctor(milliseconds minInterval, milliseconds maxInterval)
69       : generator(Random::rand32()),
70         dist(minInterval.count(), maxInterval.count()) {
71     if (minInterval > maxInterval) {
72       throw std::invalid_argument(
73           "FunctionScheduler: "
74           "min time interval must be less or equal than max interval");
75     }
76     if (minInterval < milliseconds::zero()) {
77       throw std::invalid_argument(
78           "FunctionScheduler: "
79           "time interval must be non-negative");
80     }
81   }
82
83   milliseconds operator()() { return milliseconds(dist(generator)); }
84 };
85
86 } // anonymous namespace
87
88 FunctionScheduler::FunctionScheduler() {}
89
90 FunctionScheduler::~FunctionScheduler() {
91   // make sure to stop the thread (if running)
92   shutdown();
93 }
94
95 void FunctionScheduler::addFunction(const std::function<void()>& cb,
96                                     milliseconds interval,
97                                     StringPiece nameID,
98                                     milliseconds startDelay) {
99   addFunctionGenericDistribution(
100       cb,
101       IntervalDistributionFunc(ConstIntervalFunctor(interval)),
102       nameID.str(),
103       to<std::string>(interval.count(), "ms"),
104       startDelay);
105 }
106
107 void FunctionScheduler::addFunction(const std::function<void()>& cb,
108                                     milliseconds interval,
109                                     const LatencyDistribution& latencyDistr,
110                                     StringPiece nameID,
111                                     milliseconds startDelay) {
112   if (latencyDistr.isPoisson) {
113     addFunctionGenericDistribution(
114         cb,
115         IntervalDistributionFunc(
116             PoissonDistributionFunctor(latencyDistr.poissonMean)),
117         nameID.str(),
118         to<std::string>(latencyDistr.poissonMean, "ms (Poisson mean)"),
119         startDelay);
120   } else {
121     addFunction(cb, interval, nameID, startDelay);
122   }
123 }
124
125 void FunctionScheduler::addFunctionUniformDistribution(
126     const std::function<void()>& cb,
127     milliseconds minInterval,
128     milliseconds maxInterval,
129     StringPiece nameID,
130     milliseconds startDelay) {
131   addFunctionGenericDistribution(
132       cb,
133       IntervalDistributionFunc(
134           UniformDistributionFunctor(minInterval, maxInterval)),
135       nameID.str(),
136       to<std::string>(
137           "[", minInterval.count(), " , ", maxInterval.count(), "] ms"),
138       startDelay);
139 }
140
141 void FunctionScheduler::addFunctionGenericDistribution(
142     const std::function<void()>& cb,
143     const IntervalDistributionFunc& intervalFunc,
144     const std::string& nameID,
145     const std::string& intervalDescr,
146     milliseconds startDelay) {
147   if (!cb) {
148     throw std::invalid_argument(
149         "FunctionScheduler: Scheduled function must be set");
150   }
151   if (!intervalFunc) {
152     throw std::invalid_argument(
153         "FunctionScheduler: interval distribution function must be set");
154   }
155   if (startDelay < milliseconds::zero()) {
156     throw std::invalid_argument(
157         "FunctionScheduler: start delay must be non-negative");
158   }
159
160   std::lock_guard<std::mutex> l(mutex_);
161   // check if the nameID is unique
162   for (const auto& f : functions_) {
163     if (f.isValid() && f.name == nameID) {
164       throw std::invalid_argument(
165           to<std::string>("FunctionScheduler: a function named \"",
166                           nameID,
167                           "\" already exists"));
168     }
169   }
170   if (currentFunction_ && currentFunction_->name == nameID) {
171     throw std::invalid_argument(to<std::string>(
172         "FunctionScheduler: a function named \"", nameID, "\" already exists"));
173   }
174
175   functions_.emplace_back(cb, intervalFunc, nameID, intervalDescr, startDelay);
176   if (running_) {
177     functions_.back().resetNextRunTime(steady_clock::now());
178     std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
179     // Signal the running thread to wake up and see if it needs to change it's
180     // current scheduling decision.
181     runningCondvar_.notify_one();
182   }
183 }
184
185 bool FunctionScheduler::cancelFunction(StringPiece nameID) {
186   std::unique_lock<std::mutex> l(mutex_);
187
188   if (currentFunction_ && currentFunction_->name == nameID) {
189     // This function is currently being run.  Clear currentFunction_
190     // The running thread will see this and won't reschedule the function.
191     currentFunction_ = nullptr;
192     return true;
193   }
194
195   for (auto it = functions_.begin(); it != functions_.end(); ++it) {
196     if (it->isValid() && it->name == nameID) {
197       cancelFunction(l, it);
198       return true;
199     }
200   }
201   return false;
202 }
203
204 void FunctionScheduler::cancelFunction(const std::unique_lock<std::mutex>& l,
205                                        FunctionHeap::iterator it) {
206   // This function should only be called with mutex_ already locked.
207   DCHECK(l.mutex() == &mutex_);
208   DCHECK(l.owns_lock());
209
210   if (running_) {
211     // Internally gcc has an __adjust_heap() function to fill in a hole in the
212     // heap.  Unfortunately it isn't part of the standard API.
213     //
214     // For now we just leave the RepeatFunc in our heap, but mark it as unused.
215     // When it's nextTimeInterval comes up, the runner thread will pop it from
216     // the heap and simply throw it away.
217     it->cancel();
218   } else {
219     // We're not running, so functions_ doesn't need to be maintained in heap
220     // order.
221     functions_.erase(it);
222   }
223 }
224
225 void FunctionScheduler::cancelAllFunctions() {
226   std::unique_lock<std::mutex> l(mutex_);
227   functions_.clear();
228 }
229
230 bool FunctionScheduler::start() {
231   std::unique_lock<std::mutex> l(mutex_);
232   if (running_) {
233     return false;
234   }
235
236   running_ = true;
237
238   VLOG(1) << "Starting FunctionScheduler with " << functions_.size()
239           << " functions.";
240   auto now = steady_clock::now();
241   // Reset the next run time. for all functions.
242   // note: this is needed since one can shutdown() and start() again
243   for (auto& f : functions_) {
244     f.resetNextRunTime(now);
245     VLOG(1) << "   - func: " << (f.name.empty() ? "(anon)" : f.name.c_str())
246             << ", period = " << f.intervalDescr
247             << ", delay = " << f.startDelay.count() << "ms";
248   }
249   std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
250
251   thread_ = std::thread([&] { this->run(); });
252   return true;
253 }
254
255 void FunctionScheduler::shutdown() {
256   {
257     std::lock_guard<std::mutex> g(mutex_);
258     if (!running_) {
259       return;
260     }
261
262     running_ = false;
263     runningCondvar_.notify_one();
264   }
265   thread_.join();
266 }
267
268 void FunctionScheduler::run() {
269   std::unique_lock<std::mutex> lock(mutex_);
270
271   if (!threadName_.empty()) {
272     folly::setThreadName(threadName_);
273   }
274
275   while (running_) {
276     // If we have nothing to run, wait until a function is added or until we
277     // are stopped.
278     if (functions_.empty()) {
279       runningCondvar_.wait(lock);
280       continue;
281     }
282
283     auto now = steady_clock::now();
284
285     // Move the next function to run to the end of functions_
286     std::pop_heap(functions_.begin(), functions_.end(), fnCmp_);
287
288     // Check to see if the function was cancelled.
289     // If so, just remove it and continue around the loop.
290     if (!functions_.back().isValid()) {
291       functions_.pop_back();
292       continue;
293     }
294
295     auto sleepTime = functions_.back().getNextRunTime() - now;
296     if (sleepTime < milliseconds::zero()) {
297       // We need to run this function now
298       runOneFunction(lock, now);
299     } else {
300       // Re-add the function to the heap, and wait until we actually
301       // need to run it.
302       std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
303       runningCondvar_.wait_for(lock, sleepTime);
304     }
305   }
306 }
307
308 void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
309                                        steady_clock::time_point now) {
310   DCHECK(lock.mutex() == &mutex_);
311   DCHECK(lock.owns_lock());
312
313   // The function to run will be at the end of functions_ already.
314   //
315   // Fully remove it from functions_ now.
316   // We need to release mutex_ while we invoke this function, and we need to
317   // maintain the heap property on functions_ while mutex_ is unlocked.
318   RepeatFunc func(std::move(functions_.back()));
319   functions_.pop_back();
320   if (!func.cb) {
321     VLOG(5) << func.name << "function has been canceled while waiting";
322     return;
323   }
324   currentFunction_ = &func;
325
326   // Update the function's next run time.
327   if (steady_) {
328     // This allows scheduler to catch up
329     func.setNextRunTimeSteady();
330   } else {
331     // Note that we set nextRunTime based on the current time where we started
332     // the function call, rather than the time when the function finishes.
333     // This ensures that we call the function once every time interval, as
334     // opposed to waiting time interval seconds between calls.  (These can be
335     // different if the function takes a significant amount of time to run.)
336     func.setNextRunTimeStrict(now);
337   }
338
339   // Release the lock while we invoke the user's function
340   lock.unlock();
341
342   // Invoke the function
343   try {
344     VLOG(5) << "Now running " << func.name;
345     func.cb();
346   } catch (const std::exception& ex) {
347     LOG(ERROR) << "Error running the scheduled function <"
348       << func.name << ">: " << exceptionStr(ex);
349   }
350
351   // Re-acquire the lock
352   lock.lock();
353
354   if (!currentFunction_) {
355     // The function was cancelled while we were running it.
356     // We shouldn't reschedule it;
357     return;
358   }
359   // Clear currentFunction_
360   CHECK_EQ(currentFunction_, &func);
361   currentFunction_ = nullptr;
362
363   // Re-insert the function into our functions_ heap.
364   // We only maintain the heap property while running_ is set.  (running_ may
365   // have been cleared while we were invoking the user's function.)
366   functions_.push_back(std::move(func));
367   if (running_) {
368     std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
369   }
370 }
371
372 void FunctionScheduler::setThreadName(StringPiece threadName) {
373   std::unique_lock<std::mutex> l(mutex_);
374   threadName_ = threadName.str();
375 }
376
377 }