2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 #include <folly/experimental/FunctionScheduler.h>
21 #include <folly/Conv.h>
22 #include <folly/Random.h>
23 #include <folly/String.h>
24 #include <folly/system/ThreadName.h>
26 using std::chrono::milliseconds;
27 using std::chrono::steady_clock;
33 struct ConstIntervalFunctor {
34 const milliseconds constInterval;
36 explicit ConstIntervalFunctor(milliseconds interval)
37 : constInterval(interval) {
38 if (interval < milliseconds::zero()) {
39 throw std::invalid_argument(
41 "time interval must be non-negative");
45 milliseconds operator()() const { return constInterval; }
48 struct PoissonDistributionFunctor {
49 std::default_random_engine generator;
50 std::poisson_distribution<int> poissonRandom;
52 explicit PoissonDistributionFunctor(double meanPoissonMs)
53 : poissonRandom(meanPoissonMs) {
54 if (meanPoissonMs < 0.0) {
55 throw std::invalid_argument(
57 "Poisson mean interval must be non-negative");
61 milliseconds operator()() { return milliseconds(poissonRandom(generator)); }
64 struct UniformDistributionFunctor {
65 std::default_random_engine generator;
66 std::uniform_int_distribution<milliseconds::rep> dist;
68 UniformDistributionFunctor(milliseconds minInterval, milliseconds maxInterval)
69 : generator(Random::rand32()),
70 dist(minInterval.count(), maxInterval.count()) {
71 if (minInterval > maxInterval) {
72 throw std::invalid_argument(
74 "min time interval must be less or equal than max interval");
76 if (minInterval < milliseconds::zero()) {
77 throw std::invalid_argument(
79 "time interval must be non-negative");
83 milliseconds operator()() { return milliseconds(dist(generator)); }
88 FunctionScheduler::FunctionScheduler() {}
90 FunctionScheduler::~FunctionScheduler() {
91 // make sure to stop the thread (if running)
95 void FunctionScheduler::addFunction(Function<void()>&& cb,
96 milliseconds interval,
98 milliseconds startDelay) {
101 ConstIntervalFunctor(interval),
103 to<std::string>(interval.count(), "ms"),
108 void FunctionScheduler::addFunction(Function<void()>&& cb,
109 milliseconds interval,
110 const LatencyDistribution& latencyDistr,
112 milliseconds startDelay) {
113 if (latencyDistr.isPoisson) {
116 PoissonDistributionFunctor(latencyDistr.poissonMean),
118 to<std::string>(latencyDistr.poissonMean, "ms (Poisson mean)"),
122 addFunction(std::move(cb), interval, nameID, startDelay);
126 void FunctionScheduler::addFunctionOnce(
127 Function<void()>&& cb,
129 milliseconds startDelay) {
132 ConstIntervalFunctor(milliseconds::zero()),
139 void FunctionScheduler::addFunctionUniformDistribution(
140 Function<void()>&& cb,
141 milliseconds minInterval,
142 milliseconds maxInterval,
144 milliseconds startDelay) {
147 UniformDistributionFunctor(minInterval, maxInterval),
150 "[", minInterval.count(), " , ", maxInterval.count(), "] ms"),
155 void FunctionScheduler::addFunctionGenericDistribution(
156 Function<void()>&& cb,
157 IntervalDistributionFunc&& intervalFunc,
158 const std::string& nameID,
159 const std::string& intervalDescr,
160 milliseconds startDelay) {
163 std::move(intervalFunc),
170 void FunctionScheduler::addFunctionInternal(
171 Function<void()>&& cb,
172 IntervalDistributionFunc&& intervalFunc,
173 const std::string& nameID,
174 const std::string& intervalDescr,
175 milliseconds startDelay,
178 throw std::invalid_argument(
179 "FunctionScheduler: Scheduled function must be set");
182 throw std::invalid_argument(
183 "FunctionScheduler: interval distribution function must be set");
185 if (startDelay < milliseconds::zero()) {
186 throw std::invalid_argument(
187 "FunctionScheduler: start delay must be non-negative");
190 std::unique_lock<std::mutex> l(mutex_);
191 auto it = functionsMap_.find(nameID);
192 // check if the nameID is unique
193 if (it != functionsMap_.end() && it->second->isValid()) {
194 throw std::invalid_argument(to<std::string>(
195 "FunctionScheduler: a function named \"", nameID, "\" already exists"));
198 if (currentFunction_ && currentFunction_->name == nameID) {
199 throw std::invalid_argument(to<std::string>(
200 "FunctionScheduler: a function named \"", nameID, "\" already exists"));
205 std::make_unique<RepeatFunc>(
207 std::move(intervalFunc),
214 bool FunctionScheduler::cancelFunctionWithLock(
215 std::unique_lock<std::mutex>& lock,
216 StringPiece nameID) {
217 CHECK_EQ(lock.owns_lock(), true);
218 if (currentFunction_ && currentFunction_->name == nameID) {
219 functionsMap_.erase(currentFunction_->name);
220 // This function is currently being run. Clear currentFunction_
221 // The running thread will see this and won't reschedule the function.
222 currentFunction_ = nullptr;
223 cancellingCurrentFunction_ = true;
229 bool FunctionScheduler::cancelFunction(StringPiece nameID) {
230 std::unique_lock<std::mutex> l(mutex_);
231 if (cancelFunctionWithLock(l, nameID)) {
234 auto it = functionsMap_.find(nameID);
235 if (it != functionsMap_.end() && it->second->isValid()) {
236 cancelFunction(l, it->second);
243 bool FunctionScheduler::cancelFunctionAndWait(StringPiece nameID) {
244 std::unique_lock<std::mutex> l(mutex_);
246 if (cancelFunctionWithLock(l, nameID)) {
247 runningCondvar_.wait(l, [this]() { return !cancellingCurrentFunction_; });
251 auto it = functionsMap_.find(nameID);
252 if (it != functionsMap_.end() && it->second->isValid()) {
253 cancelFunction(l, it->second);
259 void FunctionScheduler::cancelFunction(const std::unique_lock<std::mutex>& l,
261 // This function should only be called with mutex_ already locked.
262 DCHECK(l.mutex() == &mutex_);
263 DCHECK(l.owns_lock());
264 functionsMap_.erase(it->name);
268 bool FunctionScheduler::cancelAllFunctionsWithLock(
269 std::unique_lock<std::mutex>& lock) {
270 CHECK_EQ(lock.owns_lock(), true);
272 functionsMap_.clear();
273 if (currentFunction_) {
274 cancellingCurrentFunction_ = true;
276 currentFunction_ = nullptr;
277 return cancellingCurrentFunction_;
280 void FunctionScheduler::cancelAllFunctions() {
281 std::unique_lock<std::mutex> l(mutex_);
282 cancelAllFunctionsWithLock(l);
285 void FunctionScheduler::cancelAllFunctionsAndWait() {
286 std::unique_lock<std::mutex> l(mutex_);
287 if (cancelAllFunctionsWithLock(l)) {
288 runningCondvar_.wait(l, [this]() { return !cancellingCurrentFunction_; });
292 bool FunctionScheduler::resetFunctionTimer(StringPiece nameID) {
293 std::unique_lock<std::mutex> l(mutex_);
294 if (currentFunction_ && currentFunction_->name == nameID) {
295 if (cancellingCurrentFunction_ || currentFunction_->runOnce) {
298 currentFunction_->resetNextRunTime(steady_clock::now());
302 // Since __adjust_heap() isn't a part of the standard API, there's no way to
303 // fix the heap ordering if we adjust the key (nextRunTime) for the existing
304 // RepeatFunc. Instead, we just cancel it and add an identical object.
305 auto it = functionsMap_.find(nameID);
307 if (it != functionsMap_.end() && it->second->isValid()) {
308 auto funcCopy = std::make_unique<RepeatFunc>(std::move(*(it->second)));
309 it->second->cancel();
310 // This will take care of making sure that functionsMap_[it->first] =
312 addFunctionToHeap(l, std::move(funcCopy));
318 bool FunctionScheduler::start() {
319 std::unique_lock<std::mutex> l(mutex_);
324 VLOG(1) << "Starting FunctionScheduler with " << functions_.size()
326 auto now = steady_clock::now();
327 // Reset the next run time. for all functions.
328 // note: this is needed since one can shutdown() and start() again
329 for (const auto& f : functions_) {
330 f->resetNextRunTime(now);
331 VLOG(1) << " - func: " << (f->name.empty() ? "(anon)" : f->name.c_str())
332 << ", period = " << f->intervalDescr
333 << ", delay = " << f->startDelay.count() << "ms";
335 std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
337 thread_ = std::thread([&] { this->run(); });
343 bool FunctionScheduler::shutdown() {
345 std::lock_guard<std::mutex> g(mutex_);
351 runningCondvar_.notify_one();
357 void FunctionScheduler::run() {
358 std::unique_lock<std::mutex> lock(mutex_);
360 if (!threadName_.empty()) {
361 folly::setThreadName(threadName_);
365 // If we have nothing to run, wait until a function is added or until we
367 if (functions_.empty()) {
368 runningCondvar_.wait(lock);
372 auto now = steady_clock::now();
374 // Move the next function to run to the end of functions_
375 std::pop_heap(functions_.begin(), functions_.end(), fnCmp_);
377 // Check to see if the function was cancelled.
378 // If so, just remove it and continue around the loop.
379 if (!functions_.back()->isValid()) {
380 functions_.pop_back();
384 auto sleepTime = functions_.back()->getNextRunTime() - now;
385 if (sleepTime < milliseconds::zero()) {
386 // We need to run this function now
387 runOneFunction(lock, now);
388 runningCondvar_.notify_all();
390 // Re-add the function to the heap, and wait until we actually
392 std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
393 runningCondvar_.wait_for(lock, sleepTime);
398 void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
399 steady_clock::time_point now) {
400 DCHECK(lock.mutex() == &mutex_);
401 DCHECK(lock.owns_lock());
403 // The function to run will be at the end of functions_ already.
405 // Fully remove it from functions_ now.
406 // We need to release mutex_ while we invoke this function, and we need to
407 // maintain the heap property on functions_ while mutex_ is unlocked.
408 auto func = std::move(functions_.back());
409 functions_.pop_back();
411 VLOG(5) << func->name << "function has been canceled while waiting";
414 currentFunction_ = func.get();
415 // Update the function's next run time.
417 // This allows scheduler to catch up
418 func->setNextRunTimeSteady();
420 // Note that we set nextRunTime based on the current time where we started
421 // the function call, rather than the time when the function finishes.
422 // This ensures that we call the function once every time interval, as
423 // opposed to waiting time interval seconds between calls. (These can be
424 // different if the function takes a significant amount of time to run.)
425 func->setNextRunTimeStrict(now);
428 // Release the lock while we invoke the user's function
431 // Invoke the function
433 VLOG(5) << "Now running " << func->name;
435 } catch (const std::exception& ex) {
436 LOG(ERROR) << "Error running the scheduled function <"
437 << func->name << ">: " << exceptionStr(ex);
440 // Re-acquire the lock
443 if (!currentFunction_) {
444 // The function was cancelled while we were running it.
445 // We shouldn't reschedule it;
446 cancellingCurrentFunction_ = false;
449 if (currentFunction_->runOnce) {
450 // Don't reschedule if the function only needed to run once.
451 functionsMap_.erase(currentFunction_->name);
452 currentFunction_ = nullptr;
456 // Re-insert the function into our functions_ heap.
457 // We only maintain the heap property while running_ is set. (running_ may
458 // have been cleared while we were invoking the user's function.)
459 functions_.push_back(std::move(func));
461 // Clear currentFunction_
462 currentFunction_ = nullptr;
465 std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
469 void FunctionScheduler::addFunctionToHeap(
470 const std::unique_lock<std::mutex>& lock,
471 std::unique_ptr<RepeatFunc> func) {
472 // This function should only be called with mutex_ already locked.
473 DCHECK(lock.mutex() == &mutex_);
474 DCHECK(lock.owns_lock());
476 functions_.push_back(std::move(func));
477 functionsMap_[functions_.back()->name] = functions_.back().get();
479 functions_.back()->resetNextRunTime(steady_clock::now());
480 std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
481 // Signal the running thread to wake up and see if it needs to change
482 // its current scheduling decision.
483 runningCondvar_.notify_one();
487 void FunctionScheduler::setThreadName(StringPiece threadName) {
488 std::unique_lock<std::mutex> l(mutex_);
489 threadName_ = threadName.str();