/*
- * Copyright 2015 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
struct UniformDistributionFunctor {
std::default_random_engine generator;
- std::uniform_int_distribution<> dist;
+ std::uniform_int_distribution<milliseconds::rep> dist;
UniformDistributionFunctor(milliseconds minInterval, milliseconds maxInterval)
: generator(Random::rand32()),
shutdown();
}
-void FunctionScheduler::addFunction(const std::function<void()>& cb,
+void FunctionScheduler::addFunction(Function<void()>&& cb,
milliseconds interval,
StringPiece nameID,
milliseconds startDelay) {
- addFunctionGenericDistribution(
- cb,
- IntervalDistributionFunc(ConstIntervalFunctor(interval)),
+ addFunctionInternal(
+ std::move(cb),
+ ConstIntervalFunctor(interval),
nameID.str(),
to<std::string>(interval.count(), "ms"),
- startDelay);
+ startDelay,
+ false /*runOnce*/);
}
-void FunctionScheduler::addFunction(const std::function<void()>& cb,
+void FunctionScheduler::addFunction(Function<void()>&& cb,
milliseconds interval,
const LatencyDistribution& latencyDistr,
StringPiece nameID,
milliseconds startDelay) {
if (latencyDistr.isPoisson) {
- addFunctionGenericDistribution(
- cb,
- IntervalDistributionFunc(
- PoissonDistributionFunctor(latencyDistr.poissonMean)),
+ addFunctionInternal(
+ std::move(cb),
+ PoissonDistributionFunctor(latencyDistr.poissonMean),
nameID.str(),
to<std::string>(latencyDistr.poissonMean, "ms (Poisson mean)"),
- startDelay);
+ startDelay,
+ false /*runOnce*/);
} else {
- addFunction(cb, interval, nameID, startDelay);
+ addFunction(std::move(cb), interval, nameID, startDelay);
}
}
+void FunctionScheduler::addFunctionOnce(
+ Function<void()>&& cb,
+ StringPiece nameID,
+ milliseconds startDelay) {
+ addFunctionInternal(
+ std::move(cb),
+ ConstIntervalFunctor(milliseconds::zero()),
+ nameID.str(),
+ "once",
+ startDelay,
+ true /*runOnce*/);
+}
+
void FunctionScheduler::addFunctionUniformDistribution(
- const std::function<void()>& cb,
+ Function<void()>&& cb,
milliseconds minInterval,
milliseconds maxInterval,
StringPiece nameID,
milliseconds startDelay) {
- addFunctionGenericDistribution(
- cb,
- IntervalDistributionFunc(
- UniformDistributionFunctor(minInterval, maxInterval)),
+ addFunctionInternal(
+ std::move(cb),
+ UniformDistributionFunctor(minInterval, maxInterval),
nameID.str(),
to<std::string>(
"[", minInterval.count(), " , ", maxInterval.count(), "] ms"),
- startDelay);
+ startDelay,
+ false /*runOnce*/);
}
void FunctionScheduler::addFunctionGenericDistribution(
- const std::function<void()>& cb,
- const IntervalDistributionFunc& intervalFunc,
+ Function<void()>&& cb,
+ IntervalDistributionFunc&& intervalFunc,
const std::string& nameID,
const std::string& intervalDescr,
milliseconds startDelay) {
+ addFunctionInternal(
+ std::move(cb),
+ std::move(intervalFunc),
+ nameID,
+ intervalDescr,
+ startDelay,
+ false /*runOnce*/);
+}
+
+void FunctionScheduler::addFunctionInternal(
+ Function<void()>&& cb,
+ IntervalDistributionFunc&& intervalFunc,
+ const std::string& nameID,
+ const std::string& intervalDescr,
+ milliseconds startDelay,
+ bool runOnce) {
if (!cb) {
throw std::invalid_argument(
"FunctionScheduler: Scheduled function must be set");
"FunctionScheduler: start delay must be non-negative");
}
- std::lock_guard<std::mutex> l(mutex_);
+ std::unique_lock<std::mutex> l(mutex_);
// check if the nameID is unique
for (const auto& f : functions_) {
if (f.isValid() && f.name == nameID) {
"FunctionScheduler: a function named \"", nameID, "\" already exists"));
}
- functions_.emplace_back(cb, intervalFunc, nameID, intervalDescr, startDelay);
- if (running_) {
- functions_.back().resetNextRunTime(steady_clock::now());
- std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
- // Signal the running thread to wake up and see if it needs to change it's
- // current scheduling decision.
- runningCondvar_.notify_one();
- }
+ addFunctionToHeap(
+ l,
+ RepeatFunc(
+ std::move(cb),
+ std::move(intervalFunc),
+ nameID,
+ intervalDescr,
+ startDelay,
+ runOnce));
}
bool FunctionScheduler::cancelFunction(StringPiece nameID) {
std::unique_lock<std::mutex> l(mutex_);
if (currentFunction_ && currentFunction_->name == nameID) {
- // This function is currently being run. Clear currentFunction_
+ // This function is currently being run. Clear currentFunction_
// The running thread will see this and won't reschedule the function.
currentFunction_ = nullptr;
return true;
return false;
}
+bool FunctionScheduler::cancelFunctionAndWait(StringPiece nameID) {
+ std::unique_lock<std::mutex> l(mutex_);
+
+ auto* currentFunction = currentFunction_;
+ if (currentFunction && currentFunction->name == nameID) {
+ runningCondvar_.wait(l, [currentFunction, this]() {
+ return currentFunction != currentFunction_;
+ });
+ }
+
+ for (auto it = functions_.begin(); it != functions_.end(); ++it) {
+ if (it->isValid() && it->name == nameID) {
+ cancelFunction(l, it);
+ return true;
+ }
+ }
+ return false;
+}
+
void FunctionScheduler::cancelFunction(const std::unique_lock<std::mutex>& l,
FunctionHeap::iterator it) {
// This function should only be called with mutex_ already locked.
// heap. Unfortunately it isn't part of the standard API.
//
// For now we just leave the RepeatFunc in our heap, but mark it as unused.
- // When it's nextTimeInterval comes up, the runner thread will pop it from
+ // When its nextTimeInterval comes up, the runner thread will pop it from
// the heap and simply throw it away.
it->cancel();
} else {
void FunctionScheduler::cancelAllFunctions() {
std::unique_lock<std::mutex> l(mutex_);
functions_.clear();
+ currentFunction_ = nullptr;
+}
+
+void FunctionScheduler::cancelAllFunctionsAndWait() {
+ std::unique_lock<std::mutex> l(mutex_);
+ if (currentFunction_) {
+ runningCondvar_.wait(l, [this]() { return currentFunction_ == nullptr; });
+ }
+ functions_.clear();
+}
+
+bool FunctionScheduler::resetFunctionTimer(StringPiece nameID) {
+ std::unique_lock<std::mutex> l(mutex_);
+ if (currentFunction_ && currentFunction_->name == nameID) {
+ RepeatFunc* funcPtrCopy = currentFunction_;
+ // This function is currently being run. Clear currentFunction_
+ // to avoid rescheduling it, and add the function again to honor the
+ // startDelay.
+ currentFunction_ = nullptr;
+ addFunctionToHeap(l, std::move(*funcPtrCopy));
+ return true;
+ }
+
+ // Since __adjust_heap() isn't a part of the standard API, there's no way to
+ // fix the heap ordering if we adjust the key (nextRunTime) for the existing
+ // RepeatFunc. Instead, we just cancel it and add an identical object.
+ for (auto it = functions_.begin(); it != functions_.end(); ++it) {
+ if (it->isValid() && it->name == nameID) {
+ RepeatFunc funcCopy(std::move(*it));
+ cancelFunction(l, it);
+ addFunctionToHeap(l, std::move(funcCopy));
+ return true;
+ }
+ }
+ return false;
}
bool FunctionScheduler::start() {
return false;
}
- running_ = true;
-
VLOG(1) << "Starting FunctionScheduler with " << functions_.size()
<< " functions.";
auto now = steady_clock::now();
std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
thread_ = std::thread([&] { this->run(); });
+ running_ = true;
+
return true;
}
-void FunctionScheduler::shutdown() {
+bool FunctionScheduler::shutdown() {
{
std::lock_guard<std::mutex> g(mutex_);
if (!running_) {
- return;
+ return false;
}
running_ = false;
runningCondvar_.notify_one();
}
thread_.join();
+ return true;
}
void FunctionScheduler::run() {
if (sleepTime < milliseconds::zero()) {
// We need to run this function now
runOneFunction(lock, now);
+ runningCondvar_.notify_all();
} else {
// Re-add the function to the heap, and wait until we actually
// need to run it.
// We shouldn't reschedule it;
return;
}
+ if (currentFunction_->runOnce) {
+ // Don't reschedule if the function only needed to run once.
+ currentFunction_ = nullptr;
+ return;
+ }
// Clear currentFunction_
CHECK_EQ(currentFunction_, &func);
currentFunction_ = nullptr;
}
}
+void FunctionScheduler::addFunctionToHeap(
+ const std::unique_lock<std::mutex>& lock,
+ RepeatFunc&& func) {
+ // This function should only be called with mutex_ already locked.
+ DCHECK(lock.mutex() == &mutex_);
+ DCHECK(lock.owns_lock());
+
+ functions_.emplace_back(std::move(func));
+ if (running_) {
+ functions_.back().resetNextRunTime(steady_clock::now());
+ std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
+ // Signal the running thread to wake up and see if it needs to change
+ // its current scheduling decision.
+ runningCondvar_.notify_one();
+ }
+}
+
void FunctionScheduler::setThreadName(StringPiece threadName) {
std::unique_lock<std::mutex> l(mutex_);
threadName_ = threadName.str();