Expected coroutines support
[folly.git] / folly / experimental / FunctionScheduler.h
index 08553fe5908450910464c6b59c4cf45e2a810fd8..59dc4bce1e7ec34b9af6f7924781f9f4455350f1 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2015 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * limitations under the License.
  */
 
-#ifndef FOLLY_EXPERIMENTAL_FUNCTION_SCHEDULER_H_
-#define FOLLY_EXPERIMENTAL_FUNCTION_SCHEDULER_H_
+#pragma once
 
+#include <folly/Function.h>
 #include <folly/Range.h>
+#include <folly/Hash.h>
 #include <chrono>
 #include <condition_variable>
 #include <mutex>
 #include <thread>
 #include <vector>
-#include <random>
+#include <unordered_map>
 
 namespace folly {
 
@@ -43,7 +44,9 @@ namespace folly {
  *
  *
  * Note: the class uses only one thread - if you want to use more than one
- *       thread use multiple FunctionScheduler objects
+ *       thread, either use multiple FunctionScheduler objects, or check out
+ *       ThreadedRepeatingFunctionRunner.h for a much simpler contract of
+ *       "run each function periodically in its own thread".
  *
  * start() schedules the functions, while shutdown() terminates further
  * scheduling.
@@ -64,16 +67,23 @@ class FunctionScheduler {
    */
   void setSteady(bool steady) { steady_ = steady; }
 
+  /*
+   * Parameters to control the function interval.
+   *
+   * If isPoisson is true, then use std::poisson_distribution to pick the
+   * interval between each invocation of the function.
+   *
+   * If isPoisson os false, then always use fixed the interval specified to
+   * addFunction().
+   */
   struct LatencyDistribution {
     bool isPoisson;
     double poissonMean;
 
-    LatencyDistribution(bool poisson,
-                 double mean)
+    LatencyDistribution(bool poisson, double mean)
       : isPoisson(poisson),
         poissonMean(mean) {
     }
-
   };
 
   /**
@@ -87,23 +97,88 @@ class FunctionScheduler {
    * Throws an exception on error.  In particular, each function must have a
    * unique name--two functions cannot be added with the same name.
    */
-  void addFunction(const std::function<void()>& cb,
+  void addFunction(Function<void()>&& cb,
                    std::chrono::milliseconds interval,
                    StringPiece nameID = StringPiece(),
                    std::chrono::milliseconds startDelay =
                      std::chrono::milliseconds(0));
 
+  /*
+   * Add a new function to the FunctionScheduler with a specified
+   * LatencyDistribution
+   */
+  void addFunction(
+      Function<void()>&& cb,
+      std::chrono::milliseconds interval,
+      const LatencyDistribution& latencyDistr,
+      StringPiece nameID = StringPiece(),
+      std::chrono::milliseconds startDelay = std::chrono::milliseconds(0));
+
+  /**
+   * Adds a new function to the FunctionScheduler to run only once.
+   */
+  void addFunctionOnce(
+      Function<void()>&& cb,
+      StringPiece nameID = StringPiece(),
+      std::chrono::milliseconds startDelay = std::chrono::milliseconds(0));
+
+  /**
+    * Add a new function to the FunctionScheduler with the time
+    * interval being distributed uniformly within the given interval
+    * [minInterval, maxInterval].
+    */
+  void addFunctionUniformDistribution(Function<void()>&& cb,
+                                      std::chrono::milliseconds minInterval,
+                                      std::chrono::milliseconds maxInterval,
+                                      StringPiece nameID,
+                                      std::chrono::milliseconds startDelay);
+
+  /**
+   * A type alias for function that is called to determine the time
+   * interval for the next scheduled run.
+   */
+  using IntervalDistributionFunc = Function<std::chrono::milliseconds()>;
+
+  /**
+   * Add a new function to the FunctionScheduler. The scheduling interval
+   * is determined by the interval distribution functor, which is called
+   * every time the next function execution is scheduled. This allows
+   * for supporting custom interval distribution algorithms in addition
+   * to built in constant interval; and Poisson and jitter distributions
+   * (@see FunctionScheduler::addFunction and
+   * @see FunctionScheduler::addFunctionJitterInterval).
+   */
+  void addFunctionGenericDistribution(
+      Function<void()>&& cb,
+      IntervalDistributionFunc&& intervalFunc,
+      const std::string& nameID,
+      const std::string& intervalDescr,
+      std::chrono::milliseconds startDelay);
+
   /**
    * Cancels the function with the specified name, so it will no longer be run.
    *
    * Returns false if no function exists with the specified name.
    */
   bool cancelFunction(StringPiece nameID);
+  bool cancelFunctionAndWait(StringPiece nameID);
 
   /**
    * All functions registered will be canceled.
    */
   void cancelAllFunctions();
+  void cancelAllFunctionsAndWait();
+
+  /**
+   * Resets the specified function's timer.
+   * When resetFunctionTimer is called, the specified function's timer will
+   * be reset with the same parameters it was passed initially, including
+   * its startDelay. If the startDelay was 0, the function will be invoked
+   * immediately.
+   *
+   * Returns false if no function exists with the specified name.
+   */
+  bool resetFunctionTimer(StringPiece nameID);
 
   /**
    * Starts the scheduler.
@@ -116,78 +191,87 @@ class FunctionScheduler {
    * Stops the FunctionScheduler.
    *
    * It may be restarted later by calling start() again.
+   * Returns false if the scheduler was not running.
    */
-  void shutdown();
+  bool shutdown();
 
   /**
    * Set the name of the worker thread.
    */
   void setThreadName(StringPiece threadName);
 
-
  private:
-  void addFunctionInternal(const std::function<void()>& cb,
-                   std::chrono::milliseconds interval,
-                   const LatencyDistribution& latencyDistr,
-                   StringPiece nameID = StringPiece(),
-                   std::chrono::milliseconds startDelay =
-                      std::chrono::milliseconds(0));
   struct RepeatFunc {
-    std::function<void()> cb;
-    std::chrono::milliseconds timeInterval;
-    std::chrono::milliseconds lastRunTime;
+    Function<void()> cb;
+    IntervalDistributionFunc intervalFunc;
+    std::chrono::steady_clock::time_point nextRunTime;
     std::string name;
     std::chrono::milliseconds startDelay;
-    bool isPoissonDistr;
-    std::default_random_engine generator;
-    std::poisson_distribution<int> poisson_random;
-
-    RepeatFunc(const std::function<void()>& cback,
-               std::chrono::milliseconds interval,
-               const std::string& nameID,
-               std::chrono::milliseconds delay,
-               bool poisson = false,
-               double meanPoisson = 1.0)
-      : cb(cback),
-        timeInterval(interval),
-        lastRunTime(0),
-        name(nameID),
-        startDelay(delay),
-        isPoissonDistr(poisson),
-        poisson_random(meanPoisson) {
-    }
+    std::string intervalDescr;
+    bool runOnce;
+
+    RepeatFunc(
+        Function<void()>&& cback,
+        IntervalDistributionFunc&& intervalFn,
+        const std::string& nameID,
+        const std::string& intervalDistDescription,
+        std::chrono::milliseconds delay,
+        bool once)
+        : cb(std::move(cback)),
+          intervalFunc(std::move(intervalFn)),
+          nextRunTime(),
+          name(nameID),
+          startDelay(delay),
+          intervalDescr(intervalDistDescription),
+          runOnce(once) {}
 
-    std::chrono::milliseconds getNextRunTime() const {
-      return lastRunTime + timeInterval;
+    std::chrono::steady_clock::time_point getNextRunTime() const {
+      return nextRunTime;
     }
-    void setNextRunTime(std::chrono::milliseconds time) {
-      lastRunTime = time - timeInterval;
+    void setNextRunTimeStrict(std::chrono::steady_clock::time_point curTime) {
+      nextRunTime = curTime + intervalFunc();
     }
-    void setTimeIntervalPoissonDistr() {
-      if (isPoissonDistr) {
-        timeInterval = std::chrono::milliseconds(poisson_random(generator));
-      }
+    void setNextRunTimeSteady() { nextRunTime += intervalFunc(); }
+    void resetNextRunTime(std::chrono::steady_clock::time_point curTime) {
+      nextRunTime = curTime + startDelay;
     }
     void cancel() {
       // Simply reset cb to an empty function.
-      cb = std::function<void()>();
-    }
-    bool isValid() const {
-      return bool(cb);
+      cb = {};
     }
+    bool isValid() const { return bool(cb); }
   };
+
   struct RunTimeOrder {
-    bool operator()(const RepeatFunc& f1, const RepeatFunc& f2) const {
-      return f1.getNextRunTime() > f2.getNextRunTime();
+    bool operator()(const std::unique_ptr<RepeatFunc>& f1, const std::unique_ptr<RepeatFunc>& f2) const {
+      return f1->getNextRunTime() > f2->getNextRunTime();
     }
   };
-  typedef std::vector<RepeatFunc> FunctionHeap;
+
+  typedef std::vector<std::unique_ptr<RepeatFunc>> FunctionHeap;
+  typedef std::unordered_map<StringPiece, RepeatFunc*, Hash> FunctionMap;
 
   void run();
   void runOneFunction(std::unique_lock<std::mutex>& lock,
-                      std::chrono::milliseconds now);
-  void cancelFunction(const std::unique_lock<std::mutex> &lock,
-                      FunctionHeap::iterator it);
+                      std::chrono::steady_clock::time_point now);
+  void cancelFunction(const std::unique_lock<std::mutex>& lock,
+                      RepeatFunc* it);
+  void addFunctionToHeap(const std::unique_lock<std::mutex>& lock,
+                         std::unique_ptr<RepeatFunc> func);
+
+  void addFunctionInternal(
+      Function<void()>&& cb,
+      IntervalDistributionFunc&& intervalFunc,
+      const std::string& nameID,
+      const std::string& intervalDescr,
+      std::chrono::milliseconds startDelay,
+      bool runOnce);
+
+  // Return true if the current function is being canceled
+  bool cancelAllFunctionsWithLock(std::unique_lock<std::mutex>& lock);
+  bool cancelFunctionWithLock(
+      std::unique_lock<std::mutex>& lock,
+      StringPiece nameID);
 
   std::thread thread_;
 
@@ -198,6 +282,7 @@ class FunctionScheduler {
   // The functions to run.
   // This is a heap, ordered by next run time.
   FunctionHeap functions_;
+  FunctionMap functionsMap_;
   RunTimeOrder fnCmp_;
 
   // The function currently being invoked by the running thread.
@@ -210,8 +295,7 @@ class FunctionScheduler {
 
   std::string threadName_;
   bool steady_{false};
+  bool cancellingCurrentFunction_{false};
 };
 
-}
-
-#endif
+} // namespace folly