*/
#include <folly/experimental/FunctionScheduler.h>
-#include <folly/ThreadName.h>
+
+#include <random>
+
#include <folly/Conv.h>
+#include <folly/Random.h>
#include <folly/String.h>
+#include <folly/ThreadName.h>
-using namespace std;
using std::chrono::milliseconds;
using std::chrono::steady_clock;
namespace folly {
-FunctionScheduler::FunctionScheduler() {
-}
+namespace {
+
+struct ConstIntervalFunctor {
+ const milliseconds constInterval;
+
+ explicit ConstIntervalFunctor(milliseconds interval)
+ : constInterval(interval) {
+ if (interval < milliseconds::zero()) {
+ throw std::invalid_argument(
+ "FunctionScheduler: "
+ "time interval must be non-negative");
+ }
+ }
+
+ milliseconds operator()() const { return constInterval; }
+};
+
+struct PoissonDistributionFunctor {
+ std::default_random_engine generator;
+ std::poisson_distribution<int> poissonRandom;
+
+ explicit PoissonDistributionFunctor(double meanPoissonMs)
+ : poissonRandom(meanPoissonMs) {
+ if (meanPoissonMs < 0.0) {
+ throw std::invalid_argument(
+ "FunctionScheduler: "
+ "Poisson mean interval must be non-negative");
+ }
+ }
+
+ milliseconds operator()() { return milliseconds(poissonRandom(generator)); }
+};
+
+struct UniformDistributionFunctor {
+ std::default_random_engine generator;
+ std::uniform_int_distribution<> dist;
+
+ UniformDistributionFunctor(milliseconds minInterval, milliseconds maxInterval)
+ : generator(Random::rand32()),
+ dist(minInterval.count(), maxInterval.count()) {
+ if (minInterval > maxInterval) {
+ throw std::invalid_argument(
+ "FunctionScheduler: "
+ "min time interval must be less or equal than max interval");
+ }
+ if (minInterval < milliseconds::zero()) {
+ throw std::invalid_argument(
+ "FunctionScheduler: "
+ "time interval must be non-negative");
+ }
+ }
+
+ milliseconds operator()() { return milliseconds(dist(generator)); }
+};
+
+} // anonymous namespace
+
+FunctionScheduler::FunctionScheduler() {}
FunctionScheduler::~FunctionScheduler() {
// make sure to stop the thread (if running)
milliseconds interval,
StringPiece nameID,
milliseconds startDelay) {
- LatencyDistribution latencyDistr(false, 0.0);
- addFunction(cb, interval, latencyDistr, nameID, startDelay);
+ addFunctionGenericDistribution(
+ cb,
+ IntervalDistributionFunc(ConstIntervalFunctor(interval)),
+ nameID.str(),
+ to<std::string>(interval.count(), "ms"),
+ startDelay);
}
void FunctionScheduler::addFunction(const std::function<void()>& cb,
const LatencyDistribution& latencyDistr,
StringPiece nameID,
milliseconds startDelay) {
- if (interval < milliseconds::zero()) {
- throw std::invalid_argument("FunctionScheduler: "
- "time interval must be non-negative");
+ if (latencyDistr.isPoisson) {
+ addFunctionGenericDistribution(
+ cb,
+ IntervalDistributionFunc(
+ PoissonDistributionFunctor(latencyDistr.poissonMean)),
+ nameID.str(),
+ to<std::string>(latencyDistr.poissonMean, "ms (Poisson mean)"),
+ startDelay);
+ } else {
+ addFunction(cb, interval, nameID, startDelay);
+ }
+}
+
+void FunctionScheduler::addFunctionUniformDistribution(
+ const std::function<void()>& cb,
+ milliseconds minInterval,
+ milliseconds maxInterval,
+ StringPiece nameID,
+ milliseconds startDelay) {
+ addFunctionGenericDistribution(
+ cb,
+ IntervalDistributionFunc(
+ UniformDistributionFunctor(minInterval, maxInterval)),
+ nameID.str(),
+ to<std::string>(
+ "[", minInterval.count(), " , ", maxInterval.count(), "] ms"),
+ startDelay);
+}
+
+void FunctionScheduler::addFunctionGenericDistribution(
+ const std::function<void()>& cb,
+ const IntervalDistributionFunc& intervalFunc,
+ const std::string& nameID,
+ const std::string& intervalDescr,
+ milliseconds startDelay) {
+ if (!cb) {
+ throw std::invalid_argument(
+ "FunctionScheduler: Scheduled function must be set");
+ }
+ if (!intervalFunc) {
+ throw std::invalid_argument(
+ "FunctionScheduler: interval distribution function must be set");
}
if (startDelay < milliseconds::zero()) {
- throw std::invalid_argument("FunctionScheduler: "
- "start delay must be non-negative");
+ throw std::invalid_argument(
+ "FunctionScheduler: start delay must be non-negative");
}
std::lock_guard<std::mutex> l(mutex_);
// check if the nameID is unique
for (const auto& f : functions_) {
if (f.isValid() && f.name == nameID) {
- throw std::invalid_argument(to<string>(
- "FunctionScheduler: a function named \"", nameID,
- "\" already exists"));
+ throw std::invalid_argument(
+ to<std::string>("FunctionScheduler: a function named \"",
+ nameID,
+ "\" already exists"));
}
}
if (currentFunction_ && currentFunction_->name == nameID) {
- throw std::invalid_argument(to<string>(
- "FunctionScheduler: a function named \"", nameID,
- "\" already exists"));
+ throw std::invalid_argument(to<std::string>(
+ "FunctionScheduler: a function named \"", nameID, "\" already exists"));
}
- functions_.emplace_back(cb, interval, nameID.str(), startDelay,
- latencyDistr.isPoisson, latencyDistr.poissonMean);
+ functions_.emplace_back(cb, intervalFunc, nameID, intervalDescr, startDelay);
if (running_) {
- functions_.back().setNextRunTime(steady_clock::now() + startDelay);
+ functions_.back().resetNextRunTime(steady_clock::now());
std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
// Signal the running thread to wake up and see if it needs to change it's
// current scheduling decision.
// Reset the next run time. for all functions.
// note: this is needed since one can shutdown() and start() again
for (auto& f : functions_) {
- f.setNextRunTime(now + f.startDelay);
- VLOG(1) << " - func: "
- << (f.name.empty() ? "(anon)" : f.name.c_str())
- << ", period = " << f.timeInterval.count()
- << "ms, delay = " << f.startDelay.count() << "ms";
+ f.resetNextRunTime(now);
+ VLOG(1) << " - func: " << (f.name.empty() ? "(anon)" : f.name.c_str())
+ << ", period = " << f.intervalDescr
+ << ", delay = " << f.startDelay.count() << "ms";
}
std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
// maintain the heap property on functions_ while mutex_ is unlocked.
RepeatFunc func(std::move(functions_.back()));
functions_.pop_back();
+ if (!func.cb) {
+ VLOG(5) << func.name << "function has been canceled while waiting";
+ return;
+ }
currentFunction_ = &func;
- // Update the function's run time, and re-insert it into the heap.
+ // Update the function's next run time.
if (steady_) {
// This allows scheduler to catch up
- func.lastRunTime += func.timeInterval;
+ func.setNextRunTimeSteady();
} else {
- // Note that we adjust lastRunTime to the current time where we started the
- // function call, rather than the time when the function finishes.
+ // Note that we set nextRunTime based on the current time where we started
+ // the function call, rather than the time when the function finishes.
// This ensures that we call the function once every time interval, as
// opposed to waiting time interval seconds between calls. (These can be
// different if the function takes a significant amount of time to run.)
- func.lastRunTime = now;
+ func.setNextRunTimeStrict(now);
}
// Release the lock while we invoke the user's function
// Re-insert the function into our functions_ heap.
// We only maintain the heap property while running_ is set. (running_ may
// have been cleared while we were invoking the user's function.)
- if (func.isPoissonDistr) {
- func.setTimeIntervalPoissonDistr();
- }
functions_.push_back(std::move(func));
if (running_) {
std::push_heap(functions_.begin(), functions_.end(), fnCmp_);