X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;f=folly%2Fexperimental%2FFunctionScheduler.cpp;h=2e0ee6ebea9c78f24d0b908548f09e50ecd900f1;hb=108473868b7d543fcb4f7108cbcc75dc871cd833;hp=69af28abc2d0b0fdee2733cc5063fff60e8f0c66;hpb=3e6ccd5c48456a86f19e1f3022545b3a2b52786e;p=folly.git diff --git a/folly/experimental/FunctionScheduler.cpp b/folly/experimental/FunctionScheduler.cpp index 69af28ab..2e0ee6eb 100644 --- a/folly/experimental/FunctionScheduler.cpp +++ b/folly/experimental/FunctionScheduler.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2015 Facebook, Inc. + * Copyright 2016 Facebook, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -92,46 +92,44 @@ FunctionScheduler::~FunctionScheduler() { shutdown(); } -void FunctionScheduler::addFunction(const std::function& cb, +void FunctionScheduler::addFunction(Function&& cb, milliseconds interval, StringPiece nameID, milliseconds startDelay) { addFunctionGenericDistribution( - cb, - IntervalDistributionFunc(ConstIntervalFunctor(interval)), + std::move(cb), + ConstIntervalFunctor(interval), nameID.str(), to(interval.count(), "ms"), startDelay); } -void FunctionScheduler::addFunction(const std::function& cb, +void FunctionScheduler::addFunction(Function&& cb, milliseconds interval, const LatencyDistribution& latencyDistr, StringPiece nameID, milliseconds startDelay) { if (latencyDistr.isPoisson) { addFunctionGenericDistribution( - cb, - IntervalDistributionFunc( - PoissonDistributionFunctor(latencyDistr.poissonMean)), + std::move(cb), + PoissonDistributionFunctor(latencyDistr.poissonMean), nameID.str(), to(latencyDistr.poissonMean, "ms (Poisson mean)"), startDelay); } else { - addFunction(cb, interval, nameID, startDelay); + addFunction(std::move(cb), interval, nameID, startDelay); } } void FunctionScheduler::addFunctionUniformDistribution( - const std::function& cb, + Function&& cb, milliseconds minInterval, milliseconds maxInterval, StringPiece nameID, milliseconds startDelay) { addFunctionGenericDistribution( - cb, - IntervalDistributionFunc( - UniformDistributionFunctor(minInterval, maxInterval)), + std::move(cb), + UniformDistributionFunctor(minInterval, maxInterval), nameID.str(), to( "[", minInterval.count(), " , ", maxInterval.count(), "] ms"), @@ -139,8 +137,8 @@ void FunctionScheduler::addFunctionUniformDistribution( } void FunctionScheduler::addFunctionGenericDistribution( - const std::function& cb, - const IntervalDistributionFunc& intervalFunc, + Function&& cb, + IntervalDistributionFunc&& intervalFunc, const std::string& nameID, const std::string& intervalDescr, milliseconds startDelay) { @@ -157,7 +155,7 @@ void FunctionScheduler::addFunctionGenericDistribution( "FunctionScheduler: start delay must be non-negative"); } - std::lock_guard l(mutex_); + std::unique_lock l(mutex_); // check if the nameID is unique for (const auto& f : functions_) { if (f.isValid() && f.name == nameID) { @@ -172,21 +170,21 @@ void FunctionScheduler::addFunctionGenericDistribution( "FunctionScheduler: a function named \"", nameID, "\" already exists")); } - functions_.emplace_back(cb, intervalFunc, nameID, intervalDescr, startDelay); - if (running_) { - functions_.back().resetNextRunTime(steady_clock::now()); - std::push_heap(functions_.begin(), functions_.end(), fnCmp_); - // Signal the running thread to wake up and see if it needs to change it's - // current scheduling decision. - runningCondvar_.notify_one(); - } + addFunctionToHeap( + l, + RepeatFunc( + std::move(cb), + std::move(intervalFunc), + nameID, + intervalDescr, + startDelay)); } bool FunctionScheduler::cancelFunction(StringPiece nameID) { std::unique_lock l(mutex_); if (currentFunction_ && currentFunction_->name == nameID) { - // This function is currently being run. Clear currentFunction_ + // This function is currently being run. Clear currentFunction_ // The running thread will see this and won't reschedule the function. currentFunction_ = nullptr; return true; @@ -212,7 +210,7 @@ void FunctionScheduler::cancelFunction(const std::unique_lock& l, // heap. Unfortunately it isn't part of the standard API. // // For now we just leave the RepeatFunc in our heap, but mark it as unused. - // When it's nextTimeInterval comes up, the runner thread will pop it from + // When its nextTimeInterval comes up, the runner thread will pop it from // the heap and simply throw it away. it->cancel(); } else { @@ -225,6 +223,33 @@ void FunctionScheduler::cancelFunction(const std::unique_lock& l, void FunctionScheduler::cancelAllFunctions() { std::unique_lock l(mutex_); functions_.clear(); + currentFunction_ = nullptr; +} + +bool FunctionScheduler::resetFunctionTimer(StringPiece nameID) { + std::unique_lock l(mutex_); + if (currentFunction_ && currentFunction_->name == nameID) { + RepeatFunc* funcPtrCopy = currentFunction_; + // This function is currently being run. Clear currentFunction_ + // to avoid rescheduling it, and add the function again to honor the + // startDelay. + currentFunction_ = nullptr; + addFunctionToHeap(l, std::move(*funcPtrCopy)); + return true; + } + + // Since __adjust_heap() isn't a part of the standard API, there's no way to + // fix the heap ordering if we adjust the key (nextRunTime) for the existing + // RepeatFunc. Instead, we just cancel it and add an identical object. + for (auto it = functions_.begin(); it != functions_.end(); ++it) { + if (it->isValid() && it->name == nameID) { + RepeatFunc funcCopy(std::move(*it)); + cancelFunction(l, it); + addFunctionToHeap(l, std::move(funcCopy)); + return true; + } + } + return false; } bool FunctionScheduler::start() { @@ -252,17 +277,18 @@ bool FunctionScheduler::start() { return true; } -void FunctionScheduler::shutdown() { +bool FunctionScheduler::shutdown() { { std::lock_guard g(mutex_); if (!running_) { - return; + return false; } running_ = false; runningCondvar_.notify_one(); } thread_.join(); + return true; } void FunctionScheduler::run() { @@ -369,6 +395,23 @@ void FunctionScheduler::runOneFunction(std::unique_lock& lock, } } +void FunctionScheduler::addFunctionToHeap( + const std::unique_lock& lock, + RepeatFunc&& func) { + // This function should only be called with mutex_ already locked. + DCHECK(lock.mutex() == &mutex_); + DCHECK(lock.owns_lock()); + + functions_.emplace_back(std::move(func)); + if (running_) { + functions_.back().resetNextRunTime(steady_clock::now()); + std::push_heap(functions_.begin(), functions_.end(), fnCmp_); + // Signal the running thread to wake up and see if it needs to change + // its current scheduling decision. + runningCondvar_.notify_one(); + } +} + void FunctionScheduler::setThreadName(StringPiece threadName) { std::unique_lock l(mutex_); threadName_ = threadName.str();