X-Git-Url: http://plrg.eecs.uci.edu/git/?p=folly.git;a=blobdiff_plain;f=folly%2Fexperimental%2FFunctionScheduler.cpp;h=69af28abc2d0b0fdee2733cc5063fff60e8f0c66;hp=e9ea7543c1eb9f4ec3263239f9e1925c8da0b673;hb=3e6ccd5c48456a86f19e1f3022545b3a2b52786e;hpb=6699f91a5664b78f767904ccdfcfdb0e23b44865 diff --git a/folly/experimental/FunctionScheduler.cpp b/folly/experimental/FunctionScheduler.cpp index e9ea7543..69af28ab 100644 --- a/folly/experimental/FunctionScheduler.cpp +++ b/folly/experimental/FunctionScheduler.cpp @@ -15,18 +15,77 @@ */ #include -#include + +#include + #include +#include #include +#include -using namespace std; using std::chrono::milliseconds; using std::chrono::steady_clock; namespace folly { -FunctionScheduler::FunctionScheduler() { -} +namespace { + +struct ConstIntervalFunctor { + const milliseconds constInterval; + + explicit ConstIntervalFunctor(milliseconds interval) + : constInterval(interval) { + if (interval < milliseconds::zero()) { + throw std::invalid_argument( + "FunctionScheduler: " + "time interval must be non-negative"); + } + } + + milliseconds operator()() const { return constInterval; } +}; + +struct PoissonDistributionFunctor { + std::default_random_engine generator; + std::poisson_distribution poissonRandom; + + explicit PoissonDistributionFunctor(double meanPoissonMs) + : poissonRandom(meanPoissonMs) { + if (meanPoissonMs < 0.0) { + throw std::invalid_argument( + "FunctionScheduler: " + "Poisson mean interval must be non-negative"); + } + } + + milliseconds operator()() { return milliseconds(poissonRandom(generator)); } +}; + +struct UniformDistributionFunctor { + std::default_random_engine generator; + std::uniform_int_distribution<> dist; + + UniformDistributionFunctor(milliseconds minInterval, milliseconds maxInterval) + : generator(Random::rand32()), + dist(minInterval.count(), maxInterval.count()) { + if (minInterval > maxInterval) { + throw std::invalid_argument( + "FunctionScheduler: " + "min time interval must be less or equal than max interval"); + } + if (minInterval < milliseconds::zero()) { + throw std::invalid_argument( + "FunctionScheduler: " + "time interval must be non-negative"); + } + } + + milliseconds operator()() { return milliseconds(dist(generator)); } +}; + +} // anonymous namespace + +FunctionScheduler::FunctionScheduler() {} FunctionScheduler::~FunctionScheduler() { // make sure to stop the thread (if running) @@ -37,8 +96,12 @@ void FunctionScheduler::addFunction(const std::function& cb, milliseconds interval, StringPiece nameID, milliseconds startDelay) { - LatencyDistribution latencyDistr(false, 0.0); - addFunction(cb, interval, latencyDistr, nameID, startDelay); + addFunctionGenericDistribution( + cb, + IntervalDistributionFunc(ConstIntervalFunctor(interval)), + nameID.str(), + to(interval.count(), "ms"), + startDelay); } void FunctionScheduler::addFunction(const std::function& cb, @@ -46,34 +109,72 @@ void FunctionScheduler::addFunction(const std::function& cb, const LatencyDistribution& latencyDistr, StringPiece nameID, milliseconds startDelay) { - if (interval < milliseconds::zero()) { - throw std::invalid_argument("FunctionScheduler: " - "time interval must be non-negative"); + if (latencyDistr.isPoisson) { + addFunctionGenericDistribution( + cb, + IntervalDistributionFunc( + PoissonDistributionFunctor(latencyDistr.poissonMean)), + nameID.str(), + to(latencyDistr.poissonMean, "ms (Poisson mean)"), + startDelay); + } else { + addFunction(cb, interval, nameID, startDelay); + } +} + +void FunctionScheduler::addFunctionUniformDistribution( + const std::function& cb, + milliseconds minInterval, + milliseconds maxInterval, + StringPiece nameID, + milliseconds startDelay) { + addFunctionGenericDistribution( + cb, + IntervalDistributionFunc( + UniformDistributionFunctor(minInterval, maxInterval)), + nameID.str(), + to( + "[", minInterval.count(), " , ", maxInterval.count(), "] ms"), + startDelay); +} + +void FunctionScheduler::addFunctionGenericDistribution( + const std::function& cb, + const IntervalDistributionFunc& intervalFunc, + const std::string& nameID, + const std::string& intervalDescr, + milliseconds startDelay) { + if (!cb) { + throw std::invalid_argument( + "FunctionScheduler: Scheduled function must be set"); + } + if (!intervalFunc) { + throw std::invalid_argument( + "FunctionScheduler: interval distribution function must be set"); } if (startDelay < milliseconds::zero()) { - throw std::invalid_argument("FunctionScheduler: " - "start delay must be non-negative"); + throw std::invalid_argument( + "FunctionScheduler: start delay must be non-negative"); } std::lock_guard l(mutex_); // check if the nameID is unique for (const auto& f : functions_) { if (f.isValid() && f.name == nameID) { - throw std::invalid_argument(to( - "FunctionScheduler: a function named \"", nameID, - "\" already exists")); + throw std::invalid_argument( + to("FunctionScheduler: a function named \"", + nameID, + "\" already exists")); } } if (currentFunction_ && currentFunction_->name == nameID) { - throw std::invalid_argument(to( - "FunctionScheduler: a function named \"", nameID, - "\" already exists")); + throw std::invalid_argument(to( + "FunctionScheduler: a function named \"", nameID, "\" already exists")); } - functions_.emplace_back(cb, interval, nameID.str(), startDelay, - latencyDistr.isPoisson, latencyDistr.poissonMean); + functions_.emplace_back(cb, intervalFunc, nameID, intervalDescr, startDelay); if (running_) { - functions_.back().setNextRunTime(steady_clock::now() + startDelay); + functions_.back().resetNextRunTime(steady_clock::now()); std::push_heap(functions_.begin(), functions_.end(), fnCmp_); // Signal the running thread to wake up and see if it needs to change it's // current scheduling decision. @@ -140,11 +241,10 @@ bool FunctionScheduler::start() { // Reset the next run time. for all functions. // note: this is needed since one can shutdown() and start() again for (auto& f : functions_) { - f.setNextRunTime(now + f.startDelay); - VLOG(1) << " - func: " - << (f.name.empty() ? "(anon)" : f.name.c_str()) - << ", period = " << f.timeInterval.count() - << "ms, delay = " << f.startDelay.count() << "ms"; + f.resetNextRunTime(now); + VLOG(1) << " - func: " << (f.name.empty() ? "(anon)" : f.name.c_str()) + << ", period = " << f.intervalDescr + << ", delay = " << f.startDelay.count() << "ms"; } std::make_heap(functions_.begin(), functions_.end(), fnCmp_); @@ -217,19 +317,23 @@ void FunctionScheduler::runOneFunction(std::unique_lock& lock, // maintain the heap property on functions_ while mutex_ is unlocked. RepeatFunc func(std::move(functions_.back())); functions_.pop_back(); + if (!func.cb) { + VLOG(5) << func.name << "function has been canceled while waiting"; + return; + } currentFunction_ = &func; - // Update the function's run time, and re-insert it into the heap. + // Update the function's next run time. if (steady_) { // This allows scheduler to catch up - func.lastRunTime += func.timeInterval; + func.setNextRunTimeSteady(); } else { - // Note that we adjust lastRunTime to the current time where we started the - // function call, rather than the time when the function finishes. + // Note that we set nextRunTime based on the current time where we started + // the function call, rather than the time when the function finishes. // This ensures that we call the function once every time interval, as // opposed to waiting time interval seconds between calls. (These can be // different if the function takes a significant amount of time to run.) - func.lastRunTime = now; + func.setNextRunTimeStrict(now); } // Release the lock while we invoke the user's function @@ -259,9 +363,6 @@ void FunctionScheduler::runOneFunction(std::unique_lock& lock, // Re-insert the function into our functions_ heap. // We only maintain the heap property while running_ is set. (running_ may // have been cleared while we were invoking the user's function.) - if (func.isPoissonDistr) { - func.setTimeIntervalPoissonDistr(); - } functions_.push_back(std::move(func)); if (running_) { std::push_heap(functions_.begin(), functions_.end(), fnCmp_);