/*
- * Copyright 2015 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
*/
#include <folly/experimental/FunctionScheduler.h>
-#include <folly/ThreadName.h>
+
+#include <random>
+
#include <folly/Conv.h>
+#include <folly/Random.h>
#include <folly/String.h>
+#include <folly/ThreadName.h>
-#ifdef _POSIX_MONOTONIC_CLOCK
-#define FOLLY_TIME_MONOTONIC_CLOCK CLOCK_MONOTONIC
-#else
-#define FOLLY_TIME_MONOTONIC_CLOCK CLOCK_REALTIME
-#endif
-
-using namespace std;
-using std::chrono::seconds;
using std::chrono::milliseconds;
+using std::chrono::steady_clock;
+
+namespace folly {
-static milliseconds nowInMS() {
- struct timespec ts /*= void*/;
- if (clock_gettime(FOLLY_TIME_MONOTONIC_CLOCK, &ts)) {
- // Only possible failures are EFAULT or EINVAL, both practically
- // impossible. But an assert can't hurt.
- assert(false);
+namespace {
+
+struct ConstIntervalFunctor {
+ const milliseconds constInterval;
+
+ explicit ConstIntervalFunctor(milliseconds interval)
+ : constInterval(interval) {
+ if (interval < milliseconds::zero()) {
+ throw std::invalid_argument(
+ "FunctionScheduler: "
+ "time interval must be non-negative");
+ }
}
- return milliseconds(
- static_cast<int64_t>(ts.tv_sec * 1000.0 + ts.tv_nsec / 1000000.0 + 0.5));
-}
-namespace folly {
+ milliseconds operator()() const { return constInterval; }
+};
-FunctionScheduler::FunctionScheduler() {
-}
+struct PoissonDistributionFunctor {
+ std::default_random_engine generator;
+ std::poisson_distribution<int> poissonRandom;
+
+ explicit PoissonDistributionFunctor(double meanPoissonMs)
+ : poissonRandom(meanPoissonMs) {
+ if (meanPoissonMs < 0.0) {
+ throw std::invalid_argument(
+ "FunctionScheduler: "
+ "Poisson mean interval must be non-negative");
+ }
+ }
+
+ milliseconds operator()() { return milliseconds(poissonRandom(generator)); }
+};
+
+struct UniformDistributionFunctor {
+ std::default_random_engine generator;
+ std::uniform_int_distribution<milliseconds::rep> dist;
+
+ UniformDistributionFunctor(milliseconds minInterval, milliseconds maxInterval)
+ : generator(Random::rand32()),
+ dist(minInterval.count(), maxInterval.count()) {
+ if (minInterval > maxInterval) {
+ throw std::invalid_argument(
+ "FunctionScheduler: "
+ "min time interval must be less or equal than max interval");
+ }
+ if (minInterval < milliseconds::zero()) {
+ throw std::invalid_argument(
+ "FunctionScheduler: "
+ "time interval must be non-negative");
+ }
+ }
+
+ milliseconds operator()() { return milliseconds(dist(generator)); }
+};
+
+} // anonymous namespace
+
+FunctionScheduler::FunctionScheduler() {}
FunctionScheduler::~FunctionScheduler() {
// make sure to stop the thread (if running)
shutdown();
}
-void FunctionScheduler::addFunction(const std::function<void()>& cb,
+void FunctionScheduler::addFunction(Function<void()>&& cb,
milliseconds interval,
StringPiece nameID,
milliseconds startDelay) {
- LatencyDistribution latencyDistr(false, 0.0);
- addFunctionInternal(cb, interval,
- latencyDistr, nameID, startDelay);
+ addFunctionInternal(
+ std::move(cb),
+ ConstIntervalFunctor(interval),
+ nameID.str(),
+ to<std::string>(interval.count(), "ms"),
+ startDelay,
+ false /*runOnce*/);
}
-void FunctionScheduler::addFunctionInternal(const std::function<void()>& cb,
+void FunctionScheduler::addFunction(Function<void()>&& cb,
milliseconds interval,
const LatencyDistribution& latencyDistr,
StringPiece nameID,
milliseconds startDelay) {
- if (interval < milliseconds::zero()) {
- throw std::invalid_argument("FunctionScheduler: "
- "time interval must be non-negative");
+ if (latencyDistr.isPoisson) {
+ addFunctionInternal(
+ std::move(cb),
+ PoissonDistributionFunctor(latencyDistr.poissonMean),
+ nameID.str(),
+ to<std::string>(latencyDistr.poissonMean, "ms (Poisson mean)"),
+ startDelay,
+ false /*runOnce*/);
+ } else {
+ addFunction(std::move(cb), interval, nameID, startDelay);
+ }
+}
+
+void FunctionScheduler::addFunctionOnce(
+ Function<void()>&& cb,
+ StringPiece nameID,
+ milliseconds startDelay) {
+ addFunctionInternal(
+ std::move(cb),
+ ConstIntervalFunctor(milliseconds::zero()),
+ nameID.str(),
+ "once",
+ startDelay,
+ true /*runOnce*/);
+}
+
+void FunctionScheduler::addFunctionUniformDistribution(
+ Function<void()>&& cb,
+ milliseconds minInterval,
+ milliseconds maxInterval,
+ StringPiece nameID,
+ milliseconds startDelay) {
+ addFunctionInternal(
+ std::move(cb),
+ UniformDistributionFunctor(minInterval, maxInterval),
+ nameID.str(),
+ to<std::string>(
+ "[", minInterval.count(), " , ", maxInterval.count(), "] ms"),
+ startDelay,
+ false /*runOnce*/);
+}
+
+void FunctionScheduler::addFunctionGenericDistribution(
+ Function<void()>&& cb,
+ IntervalDistributionFunc&& intervalFunc,
+ const std::string& nameID,
+ const std::string& intervalDescr,
+ milliseconds startDelay) {
+ addFunctionInternal(
+ std::move(cb),
+ std::move(intervalFunc),
+ nameID,
+ intervalDescr,
+ startDelay,
+ false /*runOnce*/);
+}
+
+void FunctionScheduler::addFunctionInternal(
+ Function<void()>&& cb,
+ IntervalDistributionFunc&& intervalFunc,
+ const std::string& nameID,
+ const std::string& intervalDescr,
+ milliseconds startDelay,
+ bool runOnce) {
+ if (!cb) {
+ throw std::invalid_argument(
+ "FunctionScheduler: Scheduled function must be set");
+ }
+ if (!intervalFunc) {
+ throw std::invalid_argument(
+ "FunctionScheduler: interval distribution function must be set");
}
if (startDelay < milliseconds::zero()) {
- throw std::invalid_argument("FunctionScheduler: "
- "start delay must be non-negative");
+ throw std::invalid_argument(
+ "FunctionScheduler: start delay must be non-negative");
}
- std::lock_guard<std::mutex> l(mutex_);
+ std::unique_lock<std::mutex> l(mutex_);
// check if the nameID is unique
for (const auto& f : functions_) {
if (f.isValid() && f.name == nameID) {
- throw std::invalid_argument(to<string>(
- "FunctionScheduler: a function named \"", nameID,
- "\" already exists"));
+ throw std::invalid_argument(
+ to<std::string>("FunctionScheduler: a function named \"",
+ nameID,
+ "\" already exists"));
}
}
if (currentFunction_ && currentFunction_->name == nameID) {
- throw std::invalid_argument(to<string>(
- "FunctionScheduler: a function named \"", nameID,
- "\" already exists"));
+ throw std::invalid_argument(to<std::string>(
+ "FunctionScheduler: a function named \"", nameID, "\" already exists"));
}
- functions_.emplace_back(cb, interval, nameID.str(), startDelay,
- latencyDistr.isPoisson, latencyDistr.poissonMean);
- if (running_) {
- functions_.back().setNextRunTime(nowInMS() + startDelay);
- std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
- // Signal the running thread to wake up and see if it needs to change it's
- // current scheduling decision.
- runningCondvar_.notify_one();
+ addFunctionToHeap(
+ l,
+ RepeatFunc(
+ std::move(cb),
+ std::move(intervalFunc),
+ nameID,
+ intervalDescr,
+ startDelay,
+ runOnce));
+}
+
+bool FunctionScheduler::cancelFunctionWithLock(
+ std::unique_lock<std::mutex>& lock,
+ StringPiece nameID) {
+ CHECK_EQ(lock.owns_lock(), true);
+ if (currentFunction_ && currentFunction_->name == nameID) {
+ // This function is currently being run. Clear currentFunction_
+ // The running thread will see this and won't reschedule the function.
+ currentFunction_ = nullptr;
+ cancellingCurrentFunction_ = true;
+ return true;
}
+ return false;
}
bool FunctionScheduler::cancelFunction(StringPiece nameID) {
std::unique_lock<std::mutex> l(mutex_);
- if (currentFunction_ && currentFunction_->name == nameID) {
- // This function is currently being run. Clear currentFunction_
- // The running thread will see this and won't reschedule the function.
- currentFunction_ = nullptr;
+ if (cancelFunctionWithLock(l, nameID)) {
+ return true;
+ }
+
+ for (auto it = functions_.begin(); it != functions_.end(); ++it) {
+ if (it->isValid() && it->name == nameID) {
+ cancelFunction(l, it);
+ return true;
+ }
+ }
+ return false;
+}
+
+bool FunctionScheduler::cancelFunctionAndWait(StringPiece nameID) {
+ std::unique_lock<std::mutex> l(mutex_);
+
+ if (cancelFunctionWithLock(l, nameID)) {
+ runningCondvar_.wait(l, [this]() { return !cancellingCurrentFunction_; });
return true;
}
// heap. Unfortunately it isn't part of the standard API.
//
// For now we just leave the RepeatFunc in our heap, but mark it as unused.
- // When it's nextTimeInterval comes up, the runner thread will pop it from
+ // When its nextTimeInterval comes up, the runner thread will pop it from
// the heap and simply throw it away.
it->cancel();
} else {
}
}
+bool FunctionScheduler::cancelAllFunctionsWithLock(
+ std::unique_lock<std::mutex>& lock) {
+ CHECK_EQ(lock.owns_lock(), true);
+ functions_.clear();
+ if (currentFunction_) {
+ cancellingCurrentFunction_ = true;
+ }
+ currentFunction_ = nullptr;
+ return cancellingCurrentFunction_;
+}
+
void FunctionScheduler::cancelAllFunctions() {
std::unique_lock<std::mutex> l(mutex_);
- functions_.clear();
+ cancelAllFunctionsWithLock(l);
+}
+
+void FunctionScheduler::cancelAllFunctionsAndWait() {
+ std::unique_lock<std::mutex> l(mutex_);
+ if (cancelAllFunctionsWithLock(l)) {
+ runningCondvar_.wait(l, [this]() { return !cancellingCurrentFunction_; });
+ }
+}
+
+bool FunctionScheduler::resetFunctionTimer(StringPiece nameID) {
+ std::unique_lock<std::mutex> l(mutex_);
+ if (currentFunction_ && currentFunction_->name == nameID) {
+ RepeatFunc* funcPtrCopy = currentFunction_;
+ // This function is currently being run. Clear currentFunction_
+ // to avoid rescheduling it, and add the function again to honor the
+ // startDelay.
+ currentFunction_ = nullptr;
+ addFunctionToHeap(l, std::move(*funcPtrCopy));
+ return true;
+ }
+
+ // Since __adjust_heap() isn't a part of the standard API, there's no way to
+ // fix the heap ordering if we adjust the key (nextRunTime) for the existing
+ // RepeatFunc. Instead, we just cancel it and add an identical object.
+ for (auto it = functions_.begin(); it != functions_.end(); ++it) {
+ if (it->isValid() && it->name == nameID) {
+ RepeatFunc funcCopy(std::move(*it));
+ cancelFunction(l, it);
+ addFunctionToHeap(l, std::move(funcCopy));
+ return true;
+ }
+ }
+ return false;
}
bool FunctionScheduler::start() {
return false;
}
- running_ = true;
-
VLOG(1) << "Starting FunctionScheduler with " << functions_.size()
<< " functions.";
- milliseconds now(nowInMS());
+ auto now = steady_clock::now();
// Reset the next run time. for all functions.
// note: this is needed since one can shutdown() and start() again
for (auto& f : functions_) {
- f.setNextRunTime(now + f.startDelay);
- VLOG(1) << " - func: "
- << (f.name.empty() ? "(anon)" : f.name.c_str())
- << ", period = " << f.timeInterval.count()
- << "ms, delay = " << f.startDelay.count() << "ms";
+ f.resetNextRunTime(now);
+ VLOG(1) << " - func: " << (f.name.empty() ? "(anon)" : f.name.c_str())
+ << ", period = " << f.intervalDescr
+ << ", delay = " << f.startDelay.count() << "ms";
}
std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
thread_ = std::thread([&] { this->run(); });
+ running_ = true;
+
return true;
}
-void FunctionScheduler::shutdown() {
+bool FunctionScheduler::shutdown() {
{
std::lock_guard<std::mutex> g(mutex_);
if (!running_) {
- return;
+ return false;
}
running_ = false;
runningCondvar_.notify_one();
}
thread_.join();
+ return true;
}
void FunctionScheduler::run() {
continue;
}
- milliseconds now(nowInMS());
+ auto now = steady_clock::now();
// Move the next function to run to the end of functions_
std::pop_heap(functions_.begin(), functions_.end(), fnCmp_);
if (sleepTime < milliseconds::zero()) {
// We need to run this function now
runOneFunction(lock, now);
+ runningCondvar_.notify_all();
} else {
// Re-add the function to the heap, and wait until we actually
// need to run it.
}
void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
- std::chrono::milliseconds now) {
+ steady_clock::time_point now) {
DCHECK(lock.mutex() == &mutex_);
DCHECK(lock.owns_lock());
// maintain the heap property on functions_ while mutex_ is unlocked.
RepeatFunc func(std::move(functions_.back()));
functions_.pop_back();
+ if (!func.cb) {
+ VLOG(5) << func.name << "function has been canceled while waiting";
+ return;
+ }
currentFunction_ = &func;
- // Update the function's run time, and re-insert it into the heap.
+ // Update the function's next run time.
if (steady_) {
// This allows scheduler to catch up
- func.lastRunTime += func.timeInterval;
+ func.setNextRunTimeSteady();
} else {
- // Note that we adjust lastRunTime to the current time where we started the
- // function call, rather than the time when the function finishes.
+ // Note that we set nextRunTime based on the current time where we started
+ // the function call, rather than the time when the function finishes.
// This ensures that we call the function once every time interval, as
// opposed to waiting time interval seconds between calls. (These can be
// different if the function takes a significant amount of time to run.)
- func.lastRunTime = now;
+ func.setNextRunTimeStrict(now);
}
// Release the lock while we invoke the user's function
if (!currentFunction_) {
// The function was cancelled while we were running it.
// We shouldn't reschedule it;
+ cancellingCurrentFunction_ = false;
+ return;
+ }
+ if (currentFunction_->runOnce) {
+ // Don't reschedule if the function only needed to run once.
+ currentFunction_ = nullptr;
return;
}
// Clear currentFunction_
// Re-insert the function into our functions_ heap.
// We only maintain the heap property while running_ is set. (running_ may
// have been cleared while we were invoking the user's function.)
- if (func.isPoissonDistr) {
- func.setTimeIntervalPoissonDistr();
- }
functions_.push_back(std::move(func));
if (running_) {
std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
}
}
+void FunctionScheduler::addFunctionToHeap(
+ const std::unique_lock<std::mutex>& lock,
+ RepeatFunc&& func) {
+ // This function should only be called with mutex_ already locked.
+ DCHECK(lock.mutex() == &mutex_);
+ DCHECK(lock.owns_lock());
+
+ functions_.emplace_back(std::move(func));
+ if (running_) {
+ functions_.back().resetNextRunTime(steady_clock::now());
+ std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
+ // Signal the running thread to wake up and see if it needs to change
+ // its current scheduling decision.
+ runningCondvar_.notify_one();
+ }
+}
+
void FunctionScheduler::setThreadName(StringPiece threadName) {
std::unique_lock<std::mutex> l(mutex_);
threadName_ = threadName.str();