folly::FunctionScheduler: Adding support for uniform interval distribution
authorEugene Pekurovsky <pekurovsky@fb.com>
Wed, 9 Sep 2015 19:33:59 +0000 (12:33 -0700)
committerfacebook-github-bot-4 <folly-bot@fb.com>
Wed, 9 Sep 2015 20:20:18 +0000 (13:20 -0700)
Summary: 1) Added uniform interval distribution functionality.
2) Added a generic API for custom interval distribution algorithms.
3) Fixed an issue with removing a canceled function.
4) Did some code cleanup along the way.

Reviewed By: @​kaanb

Differential Revision: D2339911

folly/experimental/FunctionScheduler.cpp
folly/experimental/FunctionScheduler.h
folly/experimental/test/FunctionSchedulerTest.cpp

index e9ea7543c1eb9f4ec3263239f9e1925c8da0b673..69af28abc2d0b0fdee2733cc5063fff60e8f0c66 100644 (file)
  */
 
 #include <folly/experimental/FunctionScheduler.h>
-#include <folly/ThreadName.h>
+
+#include <random>
+
 #include <folly/Conv.h>
+#include <folly/Random.h>
 #include <folly/String.h>
+#include <folly/ThreadName.h>
 
-using namespace std;
 using std::chrono::milliseconds;
 using std::chrono::steady_clock;
 
 namespace folly {
 
-FunctionScheduler::FunctionScheduler() {
-}
+namespace {
+
+struct ConstIntervalFunctor {
+  const milliseconds constInterval;
+
+  explicit ConstIntervalFunctor(milliseconds interval)
+      : constInterval(interval) {
+    if (interval < milliseconds::zero()) {
+      throw std::invalid_argument(
+          "FunctionScheduler: "
+          "time interval must be non-negative");
+    }
+  }
+
+  milliseconds operator()() const { return constInterval; }
+};
+
+struct PoissonDistributionFunctor {
+  std::default_random_engine generator;
+  std::poisson_distribution<int> poissonRandom;
+
+  explicit PoissonDistributionFunctor(double meanPoissonMs)
+      : poissonRandom(meanPoissonMs) {
+    if (meanPoissonMs < 0.0) {
+      throw std::invalid_argument(
+          "FunctionScheduler: "
+          "Poisson mean interval must be non-negative");
+    }
+  }
+
+  milliseconds operator()() { return milliseconds(poissonRandom(generator)); }
+};
+
+struct UniformDistributionFunctor {
+  std::default_random_engine generator;
+  std::uniform_int_distribution<> dist;
+
+  UniformDistributionFunctor(milliseconds minInterval, milliseconds maxInterval)
+      : generator(Random::rand32()),
+        dist(minInterval.count(), maxInterval.count()) {
+    if (minInterval > maxInterval) {
+      throw std::invalid_argument(
+          "FunctionScheduler: "
+          "min time interval must be less or equal than max interval");
+    }
+    if (minInterval < milliseconds::zero()) {
+      throw std::invalid_argument(
+          "FunctionScheduler: "
+          "time interval must be non-negative");
+    }
+  }
+
+  milliseconds operator()() { return milliseconds(dist(generator)); }
+};
+
+} // anonymous namespace
+
+FunctionScheduler::FunctionScheduler() {}
 
 FunctionScheduler::~FunctionScheduler() {
   // make sure to stop the thread (if running)
@@ -37,8 +96,12 @@ void FunctionScheduler::addFunction(const std::function<void()>& cb,
                                     milliseconds interval,
                                     StringPiece nameID,
                                     milliseconds startDelay) {
-  LatencyDistribution latencyDistr(false, 0.0);
-  addFunction(cb, interval, latencyDistr, nameID, startDelay);
+  addFunctionGenericDistribution(
+      cb,
+      IntervalDistributionFunc(ConstIntervalFunctor(interval)),
+      nameID.str(),
+      to<std::string>(interval.count(), "ms"),
+      startDelay);
 }
 
 void FunctionScheduler::addFunction(const std::function<void()>& cb,
@@ -46,34 +109,72 @@ void FunctionScheduler::addFunction(const std::function<void()>& cb,
                                     const LatencyDistribution& latencyDistr,
                                     StringPiece nameID,
                                     milliseconds startDelay) {
-  if (interval < milliseconds::zero()) {
-    throw std::invalid_argument("FunctionScheduler: "
-                                "time interval must be non-negative");
+  if (latencyDistr.isPoisson) {
+    addFunctionGenericDistribution(
+        cb,
+        IntervalDistributionFunc(
+            PoissonDistributionFunctor(latencyDistr.poissonMean)),
+        nameID.str(),
+        to<std::string>(latencyDistr.poissonMean, "ms (Poisson mean)"),
+        startDelay);
+  } else {
+    addFunction(cb, interval, nameID, startDelay);
+  }
+}
+
+void FunctionScheduler::addFunctionUniformDistribution(
+    const std::function<void()>& cb,
+    milliseconds minInterval,
+    milliseconds maxInterval,
+    StringPiece nameID,
+    milliseconds startDelay) {
+  addFunctionGenericDistribution(
+      cb,
+      IntervalDistributionFunc(
+          UniformDistributionFunctor(minInterval, maxInterval)),
+      nameID.str(),
+      to<std::string>(
+          "[", minInterval.count(), " , ", maxInterval.count(), "] ms"),
+      startDelay);
+}
+
+void FunctionScheduler::addFunctionGenericDistribution(
+    const std::function<void()>& cb,
+    const IntervalDistributionFunc& intervalFunc,
+    const std::string& nameID,
+    const std::string& intervalDescr,
+    milliseconds startDelay) {
+  if (!cb) {
+    throw std::invalid_argument(
+        "FunctionScheduler: Scheduled function must be set");
+  }
+  if (!intervalFunc) {
+    throw std::invalid_argument(
+        "FunctionScheduler: interval distribution function must be set");
   }
   if (startDelay < milliseconds::zero()) {
-    throw std::invalid_argument("FunctionScheduler: "
-                                "start delay must be non-negative");
+    throw std::invalid_argument(
+        "FunctionScheduler: start delay must be non-negative");
   }
 
   std::lock_guard<std::mutex> l(mutex_);
   // check if the nameID is unique
   for (const auto& f : functions_) {
     if (f.isValid() && f.name == nameID) {
-      throw std::invalid_argument(to<string>(
-            "FunctionScheduler: a function named \"", nameID,
-            "\" already exists"));
+      throw std::invalid_argument(
+          to<std::string>("FunctionScheduler: a function named \"",
+                          nameID,
+                          "\" already exists"));
     }
   }
   if (currentFunction_ && currentFunction_->name == nameID) {
-    throw std::invalid_argument(to<string>(
-          "FunctionScheduler: a function named \"", nameID,
-          "\" already exists"));
+    throw std::invalid_argument(to<std::string>(
+        "FunctionScheduler: a function named \"", nameID, "\" already exists"));
   }
 
-  functions_.emplace_back(cb, interval, nameID.str(), startDelay,
-                          latencyDistr.isPoisson, latencyDistr.poissonMean);
+  functions_.emplace_back(cb, intervalFunc, nameID, intervalDescr, startDelay);
   if (running_) {
-    functions_.back().setNextRunTime(steady_clock::now() + startDelay);
+    functions_.back().resetNextRunTime(steady_clock::now());
     std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
     // Signal the running thread to wake up and see if it needs to change it's
     // current scheduling decision.
@@ -140,11 +241,10 @@ bool FunctionScheduler::start() {
   // Reset the next run time. for all functions.
   // note: this is needed since one can shutdown() and start() again
   for (auto& f : functions_) {
-    f.setNextRunTime(now + f.startDelay);
-    VLOG(1) << "   - func: "
-            << (f.name.empty() ? "(anon)" : f.name.c_str())
-            << ", period = " << f.timeInterval.count()
-            << "ms, delay = " << f.startDelay.count() << "ms";
+    f.resetNextRunTime(now);
+    VLOG(1) << "   - func: " << (f.name.empty() ? "(anon)" : f.name.c_str())
+            << ", period = " << f.intervalDescr
+            << ", delay = " << f.startDelay.count() << "ms";
   }
   std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
 
@@ -217,19 +317,23 @@ void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
   // maintain the heap property on functions_ while mutex_ is unlocked.
   RepeatFunc func(std::move(functions_.back()));
   functions_.pop_back();
+  if (!func.cb) {
+    VLOG(5) << func.name << "function has been canceled while waiting";
+    return;
+  }
   currentFunction_ = &func;
 
-  // Update the function's run time, and re-insert it into the heap.
+  // Update the function's next run time.
   if (steady_) {
     // This allows scheduler to catch up
-    func.lastRunTime += func.timeInterval;
+    func.setNextRunTimeSteady();
   } else {
-    // Note that we adjust lastRunTime to the current time where we started the
-    // function call, rather than the time when the function finishes.
+    // Note that we set nextRunTime based on the current time where we started
+    // the function call, rather than the time when the function finishes.
     // This ensures that we call the function once every time interval, as
     // opposed to waiting time interval seconds between calls.  (These can be
     // different if the function takes a significant amount of time to run.)
-    func.lastRunTime = now;
+    func.setNextRunTimeStrict(now);
   }
 
   // Release the lock while we invoke the user's function
@@ -259,9 +363,6 @@ void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
   // Re-insert the function into our functions_ heap.
   // We only maintain the heap property while running_ is set.  (running_ may
   // have been cleared while we were invoking the user's function.)
-  if (func.isPoissonDistr) {
-    func.setTimeIntervalPoissonDistr();
-  }
   functions_.push_back(std::move(func));
   if (running_) {
     std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
index 1e8802fe06b6974e50eb375e9b67249e39ae2892..5ccbbcd598d1e970aca8cc9414a6399c513380ac 100644 (file)
@@ -23,7 +23,6 @@
 #include <mutex>
 #include <thread>
 #include <vector>
-#include <random>
 
 namespace folly {
 
@@ -104,12 +103,45 @@ class FunctionScheduler {
    * Add a new function to the FunctionScheduler with a specified
    * LatencyDistribution
    */
-  void addFunction(const std::function<void()>& cb,
-                   std::chrono::milliseconds interval,
-                   const LatencyDistribution& latencyDistr,
-                   StringPiece nameID = StringPiece(),
-                   std::chrono::milliseconds startDelay =
-                      std::chrono::milliseconds(0));
+  void addFunction(
+      const std::function<void()>& cb,
+      std::chrono::milliseconds interval,
+      const LatencyDistribution& latencyDistr,
+      StringPiece nameID = StringPiece(),
+      std::chrono::milliseconds startDelay = std::chrono::milliseconds(0));
+
+  /**
+    * Add a new function to the FunctionScheduler with the time
+    * interval being distributed uniformly within the given interval
+    * [minInterval, maxInterval].
+    */
+  void addFunctionUniformDistribution(const std::function<void()>& cb,
+                                      std::chrono::milliseconds minInterval,
+                                      std::chrono::milliseconds maxInterval,
+                                      StringPiece nameID,
+                                      std::chrono::milliseconds startDelay);
+
+  /**
+   * A type alias for function that is called to determine the time
+   * interval for the next scheduled run.
+   */
+  using IntervalDistributionFunc = std::function<std::chrono::milliseconds()>;
+
+  /**
+   * Add a new function to the FunctionScheduler. The scheduling interval
+   * is determined by the interval distribution functor, which is called
+   * every time the next function execution is scheduled. This allows
+   * for supporting custom interval distribution algorithms in addition
+   * to built in constant interval; and Poisson and jitter distributions
+   * (@see FunctionScheduler::addFunction and
+   * @see FunctionScheduler::addFunctionJitterInterval).
+   */
+  void addFunctionGenericDistribution(
+      const std::function<void()>& cb,
+      const IntervalDistributionFunc& intervalFunc,
+      const std::string& nameID,
+      const std::string& intervalDescr,
+      std::chrono::milliseconds startDelay);
 
   /**
    * Cancels the function with the specified name, so it will no longer be run.
@@ -142,63 +174,56 @@ class FunctionScheduler {
    */
   void setThreadName(StringPiece threadName);
 
-
  private:
   struct RepeatFunc {
     std::function<void()> cb;
-    std::chrono::milliseconds timeInterval;
-    std::chrono::steady_clock::time_point lastRunTime;
+    IntervalDistributionFunc intervalFunc;
+    std::chrono::steady_clock::time_point nextRunTime;
     std::string name;
     std::chrono::milliseconds startDelay;
-    bool isPoissonDistr;
-    std::default_random_engine generator;
-    std::poisson_distribution<int> poisson_random;
+    std::string intervalDescr;
 
     RepeatFunc(const std::function<void()>& cback,
-               std::chrono::milliseconds interval,
+               const IntervalDistributionFunc& intervalFn,
                const std::string& nameID,
-               std::chrono::milliseconds delay,
-               bool poisson = false,
-               double meanPoisson = 1.0)
-      : cb(cback),
-        timeInterval(interval),
-        lastRunTime(),
-        name(nameID),
-        startDelay(delay),
-        isPoissonDistr(poisson),
-        poisson_random(meanPoisson) {
-    }
+               const std::string& intervalDistDescription,
+               std::chrono::milliseconds delay)
+        : cb(cback),
+          intervalFunc(intervalFn),
+          nextRunTime(),
+          name(nameID),
+          startDelay(delay),
+          intervalDescr(intervalDistDescription) {}
 
     std::chrono::steady_clock::time_point getNextRunTime() const {
-      return lastRunTime + timeInterval;
+      return nextRunTime;
     }
-    void setNextRunTime(std::chrono::steady_clock::time_point time) {
-      lastRunTime = time - timeInterval;
+    void setNextRunTimeStrict(std::chrono::steady_clock::time_point curTime) {
+      nextRunTime = curTime + intervalFunc();
     }
-    void setTimeIntervalPoissonDistr() {
-      if (isPoissonDistr) {
-        timeInterval = std::chrono::milliseconds(poisson_random(generator));
-      }
+    void setNextRunTimeSteady() { nextRunTime += intervalFunc(); }
+    void resetNextRunTime(std::chrono::steady_clock::time_point curTime) {
+      nextRunTime = curTime + startDelay;
     }
     void cancel() {
       // Simply reset cb to an empty function.
       cb = std::function<void()>();
     }
-    bool isValid() const {
-      return bool(cb);
-    }
+    bool isValid() const { return bool(cb); }
   };
+
   struct RunTimeOrder {
     bool operator()(const RepeatFunc& f1, const RepeatFunc& f2) const {
       return f1.getNextRunTime() > f2.getNextRunTime();
     }
   };
+
   typedef std::vector<RepeatFunc> FunctionHeap;
 
   void run();
   void runOneFunction(std::unique_lock<std::mutex>& lock,
                       std::chrono::steady_clock::time_point now);
-  void cancelFunction(const std::unique_lock<std::mutex> &lock,
+  void cancelFunction(const std::unique_lock<std::mutex>lock,
                       FunctionHeap::iterator it);
 
   std::thread thread_;
index ebe0caf6e420c86765962069765dbf0c24d499ac..466b09417dee3e14c14b37f61cbb51e123135add 100644 (file)
  */
 #include <folly/experimental/FunctionScheduler.h>
 
+#include <algorithm>
 #include <atomic>
+#include <cassert>
+#include <random>
+#include <folly/Random.h>
 #include <gtest/gtest.h>
 
 using namespace folly;
@@ -31,8 +35,12 @@ namespace {
  * to run.
  */
 static const auto timeFactor = std::chrono::milliseconds(100);
-std::chrono::milliseconds testInterval(int n) {
-  return n * timeFactor;
+std::chrono::milliseconds testInterval(int n) { return n * timeFactor; }
+int getTicksWithinRange(int n, int min, int max) {
+  assert(min <= max);
+  n = std::max(min, n);
+  n = std::min(max, n);
+  return n;
 }
 void delay(int n) {
   std::chrono::microseconds usec(n * timeFactor);
@@ -321,3 +329,86 @@ TEST(FunctionScheduler, SteadyCatchup) {
   // enough to catch back up to schedule
   EXPECT_NEAR(100, ticks.load(), 10);
 }
+
+TEST(FunctionScheduler, UniformDistribution) {
+  int total = 0;
+  const int kTicks = 2;
+  std::chrono::milliseconds minInterval =
+      testInterval(kTicks) - (timeFactor / 5);
+  std::chrono::milliseconds maxInterval =
+      testInterval(kTicks) + (timeFactor / 5);
+  FunctionScheduler fs;
+  fs.addFunctionUniformDistribution([&] { total += 2; },
+                                    minInterval,
+                                    maxInterval,
+                                    "UniformDistribution",
+                                    std::chrono::milliseconds(0));
+  fs.start();
+  delay(1);
+  EXPECT_EQ(2, total);
+  delay(kTicks);
+  EXPECT_EQ(4, total);
+  delay(kTicks);
+  EXPECT_EQ(6, total);
+  fs.shutdown();
+  delay(2);
+  EXPECT_EQ(6, total);
+}
+
+TEST(FunctionScheduler, ExponentialBackoff) {
+  int total = 0;
+  int expectedInterval = 0;
+  int nextInterval = 2;
+  FunctionScheduler fs;
+  fs.addFunctionGenericDistribution(
+      [&] { total += 2; },
+      [&expectedInterval, nextInterval]() mutable {
+        expectedInterval = nextInterval;
+        nextInterval *= nextInterval;
+        return testInterval(expectedInterval);
+      },
+      "ExponentialBackoff",
+      "2^n * 100ms",
+      std::chrono::milliseconds(0));
+  fs.start();
+  delay(1);
+  EXPECT_EQ(2, total);
+  delay(expectedInterval);
+  EXPECT_EQ(4, total);
+  delay(expectedInterval);
+  EXPECT_EQ(6, total);
+  fs.shutdown();
+  delay(2);
+  EXPECT_EQ(6, total);
+}
+
+TEST(FunctionScheduler, GammaIntervalDistribution) {
+  int total = 0;
+  int expectedInterval = 0;
+  FunctionScheduler fs;
+  std::default_random_engine generator(folly::Random::rand32());
+  // The alpha and beta arguments are selected, somewhat randomly, to be 2.0.
+  // These values do not matter much in this test, as we are not testing the
+  // std::gamma_distribution itself...
+  std::gamma_distribution<double> gamma(2.0, 2.0);
+  fs.addFunctionGenericDistribution(
+      [&] { total += 2; },
+      [&expectedInterval, generator, gamma]() mutable {
+        expectedInterval =
+            getTicksWithinRange(static_cast<int>(gamma(generator)), 2, 10);
+        return testInterval(expectedInterval);
+      },
+      "GammaDistribution",
+      "gamma(2.0,2.0)*100ms",
+      std::chrono::milliseconds(0));
+  fs.start();
+  delay(1);
+  EXPECT_EQ(2, total);
+  delay(expectedInterval);
+  EXPECT_EQ(4, total);
+  delay(expectedInterval);
+  EXPECT_EQ(6, total);
+  fs.shutdown();
+  delay(2);
+  EXPECT_EQ(6, total);
+}