/*
- * Copyright 2015 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* limitations under the License.
*/
-#ifndef FOLLY_EXPERIMENTAL_FUNCTION_SCHEDULER_H_
-#define FOLLY_EXPERIMENTAL_FUNCTION_SCHEDULER_H_
+#pragma once
+#include <folly/Function.h>
#include <folly/Range.h>
+#include <folly/hash/Hash.h>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <thread>
+#include <unordered_map>
#include <vector>
namespace folly {
*
*
* Note: the class uses only one thread - if you want to use more than one
- * thread use multiple FunctionScheduler objects
+ * thread, either use multiple FunctionScheduler objects, or check out
+ * ThreadedRepeatingFunctionRunner.h for a much simpler contract of
+ * "run each function periodically in its own thread".
*
* start() schedules the functions, while shutdown() terminates further
* scheduling.
* Throws an exception on error. In particular, each function must have a
* unique name--two functions cannot be added with the same name.
*/
- void addFunction(const std::function<void()>& cb,
+ void addFunction(Function<void()>&& cb,
std::chrono::milliseconds interval,
StringPiece nameID = StringPiece(),
std::chrono::milliseconds startDelay =
* LatencyDistribution
*/
void addFunction(
- const std::function<void()>& cb,
+ Function<void()>&& cb,
std::chrono::milliseconds interval,
const LatencyDistribution& latencyDistr,
StringPiece nameID = StringPiece(),
std::chrono::milliseconds startDelay = std::chrono::milliseconds(0));
+ /**
+ * Adds a new function to the FunctionScheduler to run only once.
+ */
+ void addFunctionOnce(
+ Function<void()>&& cb,
+ StringPiece nameID = StringPiece(),
+ std::chrono::milliseconds startDelay = std::chrono::milliseconds(0));
+
/**
* Add a new function to the FunctionScheduler with the time
* interval being distributed uniformly within the given interval
* [minInterval, maxInterval].
*/
- void addFunctionUniformDistribution(const std::function<void()>& cb,
+ void addFunctionUniformDistribution(Function<void()>&& cb,
std::chrono::milliseconds minInterval,
std::chrono::milliseconds maxInterval,
StringPiece nameID,
* A type alias for function that is called to determine the time
* interval for the next scheduled run.
*/
- using IntervalDistributionFunc = std::function<std::chrono::milliseconds()>;
+ using IntervalDistributionFunc = Function<std::chrono::milliseconds()>;
/**
* Add a new function to the FunctionScheduler. The scheduling interval
* @see FunctionScheduler::addFunctionJitterInterval).
*/
void addFunctionGenericDistribution(
- const std::function<void()>& cb,
- const IntervalDistributionFunc& intervalFunc,
+ Function<void()>&& cb,
+ IntervalDistributionFunc&& intervalFunc,
const std::string& nameID,
const std::string& intervalDescr,
std::chrono::milliseconds startDelay);
* Returns false if no function exists with the specified name.
*/
bool cancelFunction(StringPiece nameID);
+ bool cancelFunctionAndWait(StringPiece nameID);
/**
* All functions registered will be canceled.
*/
void cancelAllFunctions();
+ void cancelAllFunctionsAndWait();
+
+ /**
+ * Resets the specified function's timer.
+ * When resetFunctionTimer is called, the specified function's timer will
+ * be reset with the same parameters it was passed initially, including
+ * its startDelay. If the startDelay was 0, the function will be invoked
+ * immediately.
+ *
+ * Returns false if no function exists with the specified name.
+ */
+ bool resetFunctionTimer(StringPiece nameID);
/**
* Starts the scheduler.
* Stops the FunctionScheduler.
*
* It may be restarted later by calling start() again.
+ * Returns false if the scheduler was not running.
*/
- void shutdown();
+ bool shutdown();
/**
* Set the name of the worker thread.
private:
struct RepeatFunc {
- std::function<void()> cb;
+ Function<void()> cb;
IntervalDistributionFunc intervalFunc;
std::chrono::steady_clock::time_point nextRunTime;
std::string name;
std::chrono::milliseconds startDelay;
std::string intervalDescr;
+ bool runOnce;
- RepeatFunc(const std::function<void()>& cback,
- const IntervalDistributionFunc& intervalFn,
- const std::string& nameID,
- const std::string& intervalDistDescription,
- std::chrono::milliseconds delay)
- : cb(cback),
- intervalFunc(intervalFn),
+ RepeatFunc(
+ Function<void()>&& cback,
+ IntervalDistributionFunc&& intervalFn,
+ const std::string& nameID,
+ const std::string& intervalDistDescription,
+ std::chrono::milliseconds delay,
+ bool once)
+ : cb(std::move(cback)),
+ intervalFunc(std::move(intervalFn)),
nextRunTime(),
name(nameID),
startDelay(delay),
- intervalDescr(intervalDistDescription) {}
+ intervalDescr(intervalDistDescription),
+ runOnce(once) {}
std::chrono::steady_clock::time_point getNextRunTime() const {
return nextRunTime;
}
void cancel() {
// Simply reset cb to an empty function.
- cb = std::function<void()>();
+ cb = {};
}
bool isValid() const { return bool(cb); }
};
struct RunTimeOrder {
- bool operator()(const RepeatFunc& f1, const RepeatFunc& f2) const {
- return f1.getNextRunTime() > f2.getNextRunTime();
+ bool operator()(const std::unique_ptr<RepeatFunc>& f1, const std::unique_ptr<RepeatFunc>& f2) const {
+ return f1->getNextRunTime() > f2->getNextRunTime();
}
};
- typedef std::vector<RepeatFunc> FunctionHeap;
+ typedef std::vector<std::unique_ptr<RepeatFunc>> FunctionHeap;
+ typedef std::unordered_map<StringPiece, RepeatFunc*, Hash> FunctionMap;
void run();
void runOneFunction(std::unique_lock<std::mutex>& lock,
std::chrono::steady_clock::time_point now);
void cancelFunction(const std::unique_lock<std::mutex>& lock,
- FunctionHeap::iterator it);
+ RepeatFunc* it);
+ void addFunctionToHeap(const std::unique_lock<std::mutex>& lock,
+ std::unique_ptr<RepeatFunc> func);
+
+ void addFunctionInternal(
+ Function<void()>&& cb,
+ IntervalDistributionFunc&& intervalFunc,
+ const std::string& nameID,
+ const std::string& intervalDescr,
+ std::chrono::milliseconds startDelay,
+ bool runOnce);
+
+ // Return true if the current function is being canceled
+ bool cancelAllFunctionsWithLock(std::unique_lock<std::mutex>& lock);
+ bool cancelFunctionWithLock(
+ std::unique_lock<std::mutex>& lock,
+ StringPiece nameID);
std::thread thread_;
// The functions to run.
// This is a heap, ordered by next run time.
FunctionHeap functions_;
+ FunctionMap functionsMap_;
RunTimeOrder fnCmp_;
// The function currently being invoked by the running thread.
std::string threadName_;
bool steady_{false};
+ bool cancellingCurrentFunction_{false};
};
-}
-
-#endif
+} // namespace folly