/*
- * Copyright 2015 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* limitations under the License.
*/
-#ifndef FOLLY_EXPERIMENTAL_FUNCTION_SCHEDULER_H_
-#define FOLLY_EXPERIMENTAL_FUNCTION_SCHEDULER_H_
+#pragma once
+#include <folly/Function.h>
#include <folly/Range.h>
+#include <folly/Hash.h>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>
-#include <random>
+#include <unordered_map>
namespace folly {
*
*
* Note: the class uses only one thread - if you want to use more than one
- * thread use multiple FunctionScheduler objects
+ * thread, either use multiple FunctionScheduler objects, or check out
+ * ThreadedRepeatingFunctionRunner.h for a much simpler contract of
+ * "run each function periodically in its own thread".
*
* start() schedules the functions, while shutdown() terminates further
* scheduling.
*/
void setSteady(bool steady) { steady_ = steady; }
+ /*
+ * Parameters to control the function interval.
+ *
+ * If isPoisson is true, then use std::poisson_distribution to pick the
+ * interval between each invocation of the function.
+ *
+ * If isPoisson os false, then always use fixed the interval specified to
+ * addFunction().
+ */
struct LatencyDistribution {
bool isPoisson;
double poissonMean;
- LatencyDistribution(bool poisson,
- double mean)
+ LatencyDistribution(bool poisson, double mean)
: isPoisson(poisson),
poissonMean(mean) {
}
-
};
/**
* Throws an exception on error. In particular, each function must have a
* unique name--two functions cannot be added with the same name.
*/
- void addFunction(const std::function<void()>& cb,
+ void addFunction(Function<void()>&& cb,
std::chrono::milliseconds interval,
StringPiece nameID = StringPiece(),
std::chrono::milliseconds startDelay =
std::chrono::milliseconds(0));
+ /*
+ * Add a new function to the FunctionScheduler with a specified
+ * LatencyDistribution
+ */
+ void addFunction(
+ Function<void()>&& cb,
+ std::chrono::milliseconds interval,
+ const LatencyDistribution& latencyDistr,
+ StringPiece nameID = StringPiece(),
+ std::chrono::milliseconds startDelay = std::chrono::milliseconds(0));
+
+ /**
+ * Adds a new function to the FunctionScheduler to run only once.
+ */
+ void addFunctionOnce(
+ Function<void()>&& cb,
+ StringPiece nameID = StringPiece(),
+ std::chrono::milliseconds startDelay = std::chrono::milliseconds(0));
+
+ /**
+ * Add a new function to the FunctionScheduler with the time
+ * interval being distributed uniformly within the given interval
+ * [minInterval, maxInterval].
+ */
+ void addFunctionUniformDistribution(Function<void()>&& cb,
+ std::chrono::milliseconds minInterval,
+ std::chrono::milliseconds maxInterval,
+ StringPiece nameID,
+ std::chrono::milliseconds startDelay);
+
+ /**
+ * A type alias for function that is called to determine the time
+ * interval for the next scheduled run.
+ */
+ using IntervalDistributionFunc = Function<std::chrono::milliseconds()>;
+
+ /**
+ * Add a new function to the FunctionScheduler. The scheduling interval
+ * is determined by the interval distribution functor, which is called
+ * every time the next function execution is scheduled. This allows
+ * for supporting custom interval distribution algorithms in addition
+ * to built in constant interval; and Poisson and jitter distributions
+ * (@see FunctionScheduler::addFunction and
+ * @see FunctionScheduler::addFunctionJitterInterval).
+ */
+ void addFunctionGenericDistribution(
+ Function<void()>&& cb,
+ IntervalDistributionFunc&& intervalFunc,
+ const std::string& nameID,
+ const std::string& intervalDescr,
+ std::chrono::milliseconds startDelay);
+
/**
* Cancels the function with the specified name, so it will no longer be run.
*
* Returns false if no function exists with the specified name.
*/
bool cancelFunction(StringPiece nameID);
+ bool cancelFunctionAndWait(StringPiece nameID);
/**
* All functions registered will be canceled.
*/
void cancelAllFunctions();
+ void cancelAllFunctionsAndWait();
+
+ /**
+ * Resets the specified function's timer.
+ * When resetFunctionTimer is called, the specified function's timer will
+ * be reset with the same parameters it was passed initially, including
+ * its startDelay. If the startDelay was 0, the function will be invoked
+ * immediately.
+ *
+ * Returns false if no function exists with the specified name.
+ */
+ bool resetFunctionTimer(StringPiece nameID);
/**
* Starts the scheduler.
* Stops the FunctionScheduler.
*
* It may be restarted later by calling start() again.
+ * Returns false if the scheduler was not running.
*/
- void shutdown();
+ bool shutdown();
/**
* Set the name of the worker thread.
*/
void setThreadName(StringPiece threadName);
-
private:
- void addFunctionInternal(const std::function<void()>& cb,
- std::chrono::milliseconds interval,
- const LatencyDistribution& latencyDistr,
- StringPiece nameID = StringPiece(),
- std::chrono::milliseconds startDelay =
- std::chrono::milliseconds(0));
struct RepeatFunc {
- std::function<void()> cb;
- std::chrono::milliseconds timeInterval;
- std::chrono::milliseconds lastRunTime;
+ Function<void()> cb;
+ IntervalDistributionFunc intervalFunc;
+ std::chrono::steady_clock::time_point nextRunTime;
std::string name;
std::chrono::milliseconds startDelay;
- bool isPoissonDistr;
- std::default_random_engine generator;
- std::poisson_distribution<int> poisson_random;
-
- RepeatFunc(const std::function<void()>& cback,
- std::chrono::milliseconds interval,
- const std::string& nameID,
- std::chrono::milliseconds delay,
- bool poisson = false,
- double meanPoisson = 1.0)
- : cb(cback),
- timeInterval(interval),
- lastRunTime(0),
- name(nameID),
- startDelay(delay),
- isPoissonDistr(poisson),
- poisson_random(meanPoisson) {
- }
+ std::string intervalDescr;
+ bool runOnce;
+
+ RepeatFunc(
+ Function<void()>&& cback,
+ IntervalDistributionFunc&& intervalFn,
+ const std::string& nameID,
+ const std::string& intervalDistDescription,
+ std::chrono::milliseconds delay,
+ bool once)
+ : cb(std::move(cback)),
+ intervalFunc(std::move(intervalFn)),
+ nextRunTime(),
+ name(nameID),
+ startDelay(delay),
+ intervalDescr(intervalDistDescription),
+ runOnce(once) {}
- std::chrono::milliseconds getNextRunTime() const {
- return lastRunTime + timeInterval;
+ std::chrono::steady_clock::time_point getNextRunTime() const {
+ return nextRunTime;
}
- void setNextRunTime(std::chrono::milliseconds time) {
- lastRunTime = time - timeInterval;
+ void setNextRunTimeStrict(std::chrono::steady_clock::time_point curTime) {
+ nextRunTime = curTime + intervalFunc();
}
- void setTimeIntervalPoissonDistr() {
- if (isPoissonDistr) {
- timeInterval = std::chrono::milliseconds(poisson_random(generator));
- }
+ void setNextRunTimeSteady() { nextRunTime += intervalFunc(); }
+ void resetNextRunTime(std::chrono::steady_clock::time_point curTime) {
+ nextRunTime = curTime + startDelay;
}
void cancel() {
// Simply reset cb to an empty function.
- cb = std::function<void()>();
- }
- bool isValid() const {
- return bool(cb);
+ cb = {};
}
+ bool isValid() const { return bool(cb); }
};
+
struct RunTimeOrder {
- bool operator()(const RepeatFunc& f1, const RepeatFunc& f2) const {
- return f1.getNextRunTime() > f2.getNextRunTime();
+ bool operator()(const std::unique_ptr<RepeatFunc>& f1, const std::unique_ptr<RepeatFunc>& f2) const {
+ return f1->getNextRunTime() > f2->getNextRunTime();
}
};
- typedef std::vector<RepeatFunc> FunctionHeap;
+
+ typedef std::vector<std::unique_ptr<RepeatFunc>> FunctionHeap;
+ typedef std::unordered_map<StringPiece, RepeatFunc*, Hash> FunctionMap;
void run();
void runOneFunction(std::unique_lock<std::mutex>& lock,
- std::chrono::milliseconds now);
- void cancelFunction(const std::unique_lock<std::mutex> &lock,
- FunctionHeap::iterator it);
+ std::chrono::steady_clock::time_point now);
+ void cancelFunction(const std::unique_lock<std::mutex>& lock,
+ RepeatFunc* it);
+ void addFunctionToHeap(const std::unique_lock<std::mutex>& lock,
+ std::unique_ptr<RepeatFunc> func);
+
+ void addFunctionInternal(
+ Function<void()>&& cb,
+ IntervalDistributionFunc&& intervalFunc,
+ const std::string& nameID,
+ const std::string& intervalDescr,
+ std::chrono::milliseconds startDelay,
+ bool runOnce);
+
+ // Return true if the current function is being canceled
+ bool cancelAllFunctionsWithLock(std::unique_lock<std::mutex>& lock);
+ bool cancelFunctionWithLock(
+ std::unique_lock<std::mutex>& lock,
+ StringPiece nameID);
std::thread thread_;
// The functions to run.
// This is a heap, ordered by next run time.
FunctionHeap functions_;
+ FunctionMap functionsMap_;
RunTimeOrder fnCmp_;
// The function currently being invoked by the running thread.
std::string threadName_;
bool steady_{false};
+ bool cancellingCurrentFunction_{false};
};
-}
-
-#endif
+} // namespace folly