2a94501523340d28c82ddc07afbbdee20236cc1e
[folly.git] / folly / experimental / FunctionScheduler.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <folly/experimental/FunctionScheduler.h>
18
19 #include <random>
20
21 #include <folly/Conv.h>
22 #include <folly/Random.h>
23 #include <folly/String.h>
24 #include <folly/ThreadName.h>
25
26 using std::chrono::milliseconds;
27 using std::chrono::steady_clock;
28
29 namespace folly {
30
31 namespace {
32
33 struct ConstIntervalFunctor {
34   const milliseconds constInterval;
35
36   explicit ConstIntervalFunctor(milliseconds interval)
37       : constInterval(interval) {
38     if (interval < milliseconds::zero()) {
39       throw std::invalid_argument(
40           "FunctionScheduler: "
41           "time interval must be non-negative");
42     }
43   }
44
45   milliseconds operator()() const { return constInterval; }
46 };
47
48 struct PoissonDistributionFunctor {
49   std::default_random_engine generator;
50   std::poisson_distribution<int> poissonRandom;
51
52   explicit PoissonDistributionFunctor(double meanPoissonMs)
53       : poissonRandom(meanPoissonMs) {
54     if (meanPoissonMs < 0.0) {
55       throw std::invalid_argument(
56           "FunctionScheduler: "
57           "Poisson mean interval must be non-negative");
58     }
59   }
60
61   milliseconds operator()() { return milliseconds(poissonRandom(generator)); }
62 };
63
64 struct UniformDistributionFunctor {
65   std::default_random_engine generator;
66   std::uniform_int_distribution<milliseconds::rep> dist;
67
68   UniformDistributionFunctor(milliseconds minInterval, milliseconds maxInterval)
69       : generator(Random::rand32()),
70         dist(minInterval.count(), maxInterval.count()) {
71     if (minInterval > maxInterval) {
72       throw std::invalid_argument(
73           "FunctionScheduler: "
74           "min time interval must be less or equal than max interval");
75     }
76     if (minInterval < milliseconds::zero()) {
77       throw std::invalid_argument(
78           "FunctionScheduler: "
79           "time interval must be non-negative");
80     }
81   }
82
83   milliseconds operator()() { return milliseconds(dist(generator)); }
84 };
85
86 } // anonymous namespace
87
88 FunctionScheduler::FunctionScheduler() {}
89
90 FunctionScheduler::~FunctionScheduler() {
91   // make sure to stop the thread (if running)
92   shutdown();
93 }
94
95 void FunctionScheduler::addFunction(Function<void()>&& cb,
96                                     milliseconds interval,
97                                     StringPiece nameID,
98                                     milliseconds startDelay) {
99   addFunctionInternal(
100       std::move(cb),
101       ConstIntervalFunctor(interval),
102       nameID.str(),
103       to<std::string>(interval.count(), "ms"),
104       startDelay,
105       false /*runOnce*/);
106 }
107
108 void FunctionScheduler::addFunction(Function<void()>&& cb,
109                                     milliseconds interval,
110                                     const LatencyDistribution& latencyDistr,
111                                     StringPiece nameID,
112                                     milliseconds startDelay) {
113   if (latencyDistr.isPoisson) {
114     addFunctionInternal(
115         std::move(cb),
116         PoissonDistributionFunctor(latencyDistr.poissonMean),
117         nameID.str(),
118         to<std::string>(latencyDistr.poissonMean, "ms (Poisson mean)"),
119         startDelay,
120         false /*runOnce*/);
121   } else {
122     addFunction(std::move(cb), interval, nameID, startDelay);
123   }
124 }
125
126 void FunctionScheduler::addFunctionOnce(
127     Function<void()>&& cb,
128     StringPiece nameID,
129     milliseconds startDelay) {
130   addFunctionInternal(
131       std::move(cb),
132       ConstIntervalFunctor(milliseconds::zero()),
133       nameID.str(),
134       "once",
135       startDelay,
136       true /*runOnce*/);
137 }
138
139 void FunctionScheduler::addFunctionUniformDistribution(
140     Function<void()>&& cb,
141     milliseconds minInterval,
142     milliseconds maxInterval,
143     StringPiece nameID,
144     milliseconds startDelay) {
145   addFunctionInternal(
146       std::move(cb),
147       UniformDistributionFunctor(minInterval, maxInterval),
148       nameID.str(),
149       to<std::string>(
150           "[", minInterval.count(), " , ", maxInterval.count(), "] ms"),
151       startDelay,
152       false /*runOnce*/);
153 }
154
155 void FunctionScheduler::addFunctionGenericDistribution(
156     Function<void()>&& cb,
157     IntervalDistributionFunc&& intervalFunc,
158     const std::string& nameID,
159     const std::string& intervalDescr,
160     milliseconds startDelay) {
161   addFunctionInternal(
162       std::move(cb),
163       std::move(intervalFunc),
164       nameID,
165       intervalDescr,
166       startDelay,
167       false /*runOnce*/);
168 }
169
170 void FunctionScheduler::addFunctionInternal(
171     Function<void()>&& cb,
172     IntervalDistributionFunc&& intervalFunc,
173     const std::string& nameID,
174     const std::string& intervalDescr,
175     milliseconds startDelay,
176     bool runOnce) {
177   if (!cb) {
178     throw std::invalid_argument(
179         "FunctionScheduler: Scheduled function must be set");
180   }
181   if (!intervalFunc) {
182     throw std::invalid_argument(
183         "FunctionScheduler: interval distribution function must be set");
184   }
185   if (startDelay < milliseconds::zero()) {
186     throw std::invalid_argument(
187         "FunctionScheduler: start delay must be non-negative");
188   }
189
190   std::unique_lock<std::mutex> l(mutex_);
191   // check if the nameID is unique
192   for (const auto& f : functions_) {
193     if (f.isValid() && f.name == nameID) {
194       throw std::invalid_argument(
195           to<std::string>("FunctionScheduler: a function named \"",
196                           nameID,
197                           "\" already exists"));
198     }
199   }
200   if (currentFunction_ && currentFunction_->name == nameID) {
201     throw std::invalid_argument(to<std::string>(
202         "FunctionScheduler: a function named \"", nameID, "\" already exists"));
203   }
204
205   addFunctionToHeap(
206       l,
207       RepeatFunc(
208           std::move(cb),
209           std::move(intervalFunc),
210           nameID,
211           intervalDescr,
212           startDelay,
213           runOnce));
214 }
215
216 bool FunctionScheduler::cancelFunctionWithLock(
217     std::unique_lock<std::mutex>& lock,
218     StringPiece nameID) {
219   CHECK_EQ(lock.owns_lock(), true);
220   if (currentFunction_ && currentFunction_->name == nameID) {
221     // This function is currently being run. Clear currentFunction_
222     // The running thread will see this and won't reschedule the function.
223     currentFunction_ = nullptr;
224     cancellingCurrentFunction_ = true;
225     return true;
226   }
227   return false;
228 }
229
230 bool FunctionScheduler::cancelFunction(StringPiece nameID) {
231   std::unique_lock<std::mutex> l(mutex_);
232
233   if (cancelFunctionWithLock(l, nameID)) {
234     return true;
235   }
236
237   for (auto it = functions_.begin(); it != functions_.end(); ++it) {
238     if (it->isValid() && it->name == nameID) {
239       cancelFunction(l, it);
240       return true;
241     }
242   }
243   return false;
244 }
245
246 bool FunctionScheduler::cancelFunctionAndWait(StringPiece nameID) {
247   std::unique_lock<std::mutex> l(mutex_);
248
249   if (cancelFunctionWithLock(l, nameID)) {
250     runningCondvar_.wait(l, [this]() { return !cancellingCurrentFunction_; });
251     return true;
252   }
253
254   for (auto it = functions_.begin(); it != functions_.end(); ++it) {
255     if (it->isValid() && it->name == nameID) {
256       cancelFunction(l, it);
257       return true;
258     }
259   }
260   return false;
261 }
262
263 void FunctionScheduler::cancelFunction(const std::unique_lock<std::mutex>& l,
264                                        FunctionHeap::iterator it) {
265   // This function should only be called with mutex_ already locked.
266   DCHECK(l.mutex() == &mutex_);
267   DCHECK(l.owns_lock());
268
269   if (running_) {
270     // Internally gcc has an __adjust_heap() function to fill in a hole in the
271     // heap.  Unfortunately it isn't part of the standard API.
272     //
273     // For now we just leave the RepeatFunc in our heap, but mark it as unused.
274     // When its nextTimeInterval comes up, the runner thread will pop it from
275     // the heap and simply throw it away.
276     it->cancel();
277   } else {
278     // We're not running, so functions_ doesn't need to be maintained in heap
279     // order.
280     functions_.erase(it);
281   }
282 }
283
284 bool FunctionScheduler::cancelAllFunctionsWithLock(
285     std::unique_lock<std::mutex>& lock) {
286   CHECK_EQ(lock.owns_lock(), true);
287   functions_.clear();
288   if (currentFunction_) {
289     cancellingCurrentFunction_ = true;
290   }
291   currentFunction_ = nullptr;
292   return cancellingCurrentFunction_;
293 }
294
295 void FunctionScheduler::cancelAllFunctions() {
296   std::unique_lock<std::mutex> l(mutex_);
297   cancelAllFunctionsWithLock(l);
298 }
299
300 void FunctionScheduler::cancelAllFunctionsAndWait() {
301   std::unique_lock<std::mutex> l(mutex_);
302   if (cancelAllFunctionsWithLock(l)) {
303     runningCondvar_.wait(l, [this]() { return !cancellingCurrentFunction_; });
304   }
305 }
306
307 bool FunctionScheduler::resetFunctionTimer(StringPiece nameID) {
308   std::unique_lock<std::mutex> l(mutex_);
309   if (currentFunction_ && currentFunction_->name == nameID) {
310     RepeatFunc* funcPtrCopy = currentFunction_;
311     // This function is currently being run. Clear currentFunction_
312     // to avoid rescheduling it, and add the function again to honor the
313     // startDelay.
314     currentFunction_ = nullptr;
315     addFunctionToHeap(l, std::move(*funcPtrCopy));
316     return true;
317   }
318
319   // Since __adjust_heap() isn't a part of the standard API, there's no way to
320   // fix the heap ordering if we adjust the key (nextRunTime) for the existing
321   // RepeatFunc. Instead, we just cancel it and add an identical object.
322   for (auto it = functions_.begin(); it != functions_.end(); ++it) {
323     if (it->isValid() && it->name == nameID) {
324       RepeatFunc funcCopy(std::move(*it));
325       cancelFunction(l, it);
326       addFunctionToHeap(l, std::move(funcCopy));
327       return true;
328     }
329   }
330   return false;
331 }
332
333 bool FunctionScheduler::start() {
334   std::unique_lock<std::mutex> l(mutex_);
335   if (running_) {
336     return false;
337   }
338
339   VLOG(1) << "Starting FunctionScheduler with " << functions_.size()
340           << " functions.";
341   auto now = steady_clock::now();
342   // Reset the next run time. for all functions.
343   // note: this is needed since one can shutdown() and start() again
344   for (auto& f : functions_) {
345     f.resetNextRunTime(now);
346     VLOG(1) << "   - func: " << (f.name.empty() ? "(anon)" : f.name.c_str())
347             << ", period = " << f.intervalDescr
348             << ", delay = " << f.startDelay.count() << "ms";
349   }
350   std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
351
352   thread_ = std::thread([&] { this->run(); });
353   running_ = true;
354
355   return true;
356 }
357
358 bool FunctionScheduler::shutdown() {
359   {
360     std::lock_guard<std::mutex> g(mutex_);
361     if (!running_) {
362       return false;
363     }
364
365     running_ = false;
366     runningCondvar_.notify_one();
367   }
368   thread_.join();
369   return true;
370 }
371
372 void FunctionScheduler::run() {
373   std::unique_lock<std::mutex> lock(mutex_);
374
375   if (!threadName_.empty()) {
376     folly::setThreadName(threadName_);
377   }
378
379   while (running_) {
380     // If we have nothing to run, wait until a function is added or until we
381     // are stopped.
382     if (functions_.empty()) {
383       runningCondvar_.wait(lock);
384       continue;
385     }
386
387     auto now = steady_clock::now();
388
389     // Move the next function to run to the end of functions_
390     std::pop_heap(functions_.begin(), functions_.end(), fnCmp_);
391
392     // Check to see if the function was cancelled.
393     // If so, just remove it and continue around the loop.
394     if (!functions_.back().isValid()) {
395       functions_.pop_back();
396       continue;
397     }
398
399     auto sleepTime = functions_.back().getNextRunTime() - now;
400     if (sleepTime < milliseconds::zero()) {
401       // We need to run this function now
402       runOneFunction(lock, now);
403       runningCondvar_.notify_all();
404     } else {
405       // Re-add the function to the heap, and wait until we actually
406       // need to run it.
407       std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
408       runningCondvar_.wait_for(lock, sleepTime);
409     }
410   }
411 }
412
413 void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
414                                        steady_clock::time_point now) {
415   DCHECK(lock.mutex() == &mutex_);
416   DCHECK(lock.owns_lock());
417
418   // The function to run will be at the end of functions_ already.
419   //
420   // Fully remove it from functions_ now.
421   // We need to release mutex_ while we invoke this function, and we need to
422   // maintain the heap property on functions_ while mutex_ is unlocked.
423   RepeatFunc func(std::move(functions_.back()));
424   functions_.pop_back();
425   if (!func.cb) {
426     VLOG(5) << func.name << "function has been canceled while waiting";
427     return;
428   }
429   currentFunction_ = &func;
430
431   // Update the function's next run time.
432   if (steady_) {
433     // This allows scheduler to catch up
434     func.setNextRunTimeSteady();
435   } else {
436     // Note that we set nextRunTime based on the current time where we started
437     // the function call, rather than the time when the function finishes.
438     // This ensures that we call the function once every time interval, as
439     // opposed to waiting time interval seconds between calls.  (These can be
440     // different if the function takes a significant amount of time to run.)
441     func.setNextRunTimeStrict(now);
442   }
443
444   // Release the lock while we invoke the user's function
445   lock.unlock();
446
447   // Invoke the function
448   try {
449     VLOG(5) << "Now running " << func.name;
450     func.cb();
451   } catch (const std::exception& ex) {
452     LOG(ERROR) << "Error running the scheduled function <"
453       << func.name << ">: " << exceptionStr(ex);
454   }
455
456   // Re-acquire the lock
457   lock.lock();
458
459   if (!currentFunction_) {
460     // The function was cancelled while we were running it.
461     // We shouldn't reschedule it;
462     cancellingCurrentFunction_ = false;
463     return;
464   }
465   if (currentFunction_->runOnce) {
466     // Don't reschedule if the function only needed to run once.
467     currentFunction_ = nullptr;
468     return;
469   }
470   // Clear currentFunction_
471   CHECK_EQ(currentFunction_, &func);
472   currentFunction_ = nullptr;
473
474   // Re-insert the function into our functions_ heap.
475   // We only maintain the heap property while running_ is set.  (running_ may
476   // have been cleared while we were invoking the user's function.)
477   functions_.push_back(std::move(func));
478   if (running_) {
479     std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
480   }
481 }
482
483 void FunctionScheduler::addFunctionToHeap(
484     const std::unique_lock<std::mutex>& lock,
485     RepeatFunc&& func) {
486   // This function should only be called with mutex_ already locked.
487   DCHECK(lock.mutex() == &mutex_);
488   DCHECK(lock.owns_lock());
489
490   functions_.emplace_back(std::move(func));
491   if (running_) {
492     functions_.back().resetNextRunTime(steady_clock::now());
493     std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
494     // Signal the running thread to wake up and see if it needs to change
495     // its current scheduling decision.
496     runningCondvar_.notify_one();
497   }
498 }
499
500 void FunctionScheduler::setThreadName(StringPiece threadName) {
501   std::unique_lock<std::mutex> l(mutex_);
502   threadName_ = threadName.str();
503 }
504
505 }