folly::FunctionScheduler: Adding support for uniform interval distribution
[folly.git] / folly / experimental / FunctionScheduler.cpp
index e9ea7543c1eb9f4ec3263239f9e1925c8da0b673..69af28abc2d0b0fdee2733cc5063fff60e8f0c66 100644 (file)
  */
 
 #include <folly/experimental/FunctionScheduler.h>
  */
 
 #include <folly/experimental/FunctionScheduler.h>
-#include <folly/ThreadName.h>
+
+#include <random>
+
 #include <folly/Conv.h>
 #include <folly/Conv.h>
+#include <folly/Random.h>
 #include <folly/String.h>
 #include <folly/String.h>
+#include <folly/ThreadName.h>
 
 
-using namespace std;
 using std::chrono::milliseconds;
 using std::chrono::steady_clock;
 
 namespace folly {
 
 using std::chrono::milliseconds;
 using std::chrono::steady_clock;
 
 namespace folly {
 
-FunctionScheduler::FunctionScheduler() {
-}
+namespace {
+
+struct ConstIntervalFunctor {
+  const milliseconds constInterval;
+
+  explicit ConstIntervalFunctor(milliseconds interval)
+      : constInterval(interval) {
+    if (interval < milliseconds::zero()) {
+      throw std::invalid_argument(
+          "FunctionScheduler: "
+          "time interval must be non-negative");
+    }
+  }
+
+  milliseconds operator()() const { return constInterval; }
+};
+
+struct PoissonDistributionFunctor {
+  std::default_random_engine generator;
+  std::poisson_distribution<int> poissonRandom;
+
+  explicit PoissonDistributionFunctor(double meanPoissonMs)
+      : poissonRandom(meanPoissonMs) {
+    if (meanPoissonMs < 0.0) {
+      throw std::invalid_argument(
+          "FunctionScheduler: "
+          "Poisson mean interval must be non-negative");
+    }
+  }
+
+  milliseconds operator()() { return milliseconds(poissonRandom(generator)); }
+};
+
+struct UniformDistributionFunctor {
+  std::default_random_engine generator;
+  std::uniform_int_distribution<> dist;
+
+  UniformDistributionFunctor(milliseconds minInterval, milliseconds maxInterval)
+      : generator(Random::rand32()),
+        dist(minInterval.count(), maxInterval.count()) {
+    if (minInterval > maxInterval) {
+      throw std::invalid_argument(
+          "FunctionScheduler: "
+          "min time interval must be less or equal than max interval");
+    }
+    if (minInterval < milliseconds::zero()) {
+      throw std::invalid_argument(
+          "FunctionScheduler: "
+          "time interval must be non-negative");
+    }
+  }
+
+  milliseconds operator()() { return milliseconds(dist(generator)); }
+};
+
+} // anonymous namespace
+
+FunctionScheduler::FunctionScheduler() {}
 
 FunctionScheduler::~FunctionScheduler() {
   // make sure to stop the thread (if running)
 
 FunctionScheduler::~FunctionScheduler() {
   // make sure to stop the thread (if running)
@@ -37,8 +96,12 @@ void FunctionScheduler::addFunction(const std::function<void()>& cb,
                                     milliseconds interval,
                                     StringPiece nameID,
                                     milliseconds startDelay) {
                                     milliseconds interval,
                                     StringPiece nameID,
                                     milliseconds startDelay) {
-  LatencyDistribution latencyDistr(false, 0.0);
-  addFunction(cb, interval, latencyDistr, nameID, startDelay);
+  addFunctionGenericDistribution(
+      cb,
+      IntervalDistributionFunc(ConstIntervalFunctor(interval)),
+      nameID.str(),
+      to<std::string>(interval.count(), "ms"),
+      startDelay);
 }
 
 void FunctionScheduler::addFunction(const std::function<void()>& cb,
 }
 
 void FunctionScheduler::addFunction(const std::function<void()>& cb,
@@ -46,34 +109,72 @@ void FunctionScheduler::addFunction(const std::function<void()>& cb,
                                     const LatencyDistribution& latencyDistr,
                                     StringPiece nameID,
                                     milliseconds startDelay) {
                                     const LatencyDistribution& latencyDistr,
                                     StringPiece nameID,
                                     milliseconds startDelay) {
-  if (interval < milliseconds::zero()) {
-    throw std::invalid_argument("FunctionScheduler: "
-                                "time interval must be non-negative");
+  if (latencyDistr.isPoisson) {
+    addFunctionGenericDistribution(
+        cb,
+        IntervalDistributionFunc(
+            PoissonDistributionFunctor(latencyDistr.poissonMean)),
+        nameID.str(),
+        to<std::string>(latencyDistr.poissonMean, "ms (Poisson mean)"),
+        startDelay);
+  } else {
+    addFunction(cb, interval, nameID, startDelay);
+  }
+}
+
+void FunctionScheduler::addFunctionUniformDistribution(
+    const std::function<void()>& cb,
+    milliseconds minInterval,
+    milliseconds maxInterval,
+    StringPiece nameID,
+    milliseconds startDelay) {
+  addFunctionGenericDistribution(
+      cb,
+      IntervalDistributionFunc(
+          UniformDistributionFunctor(minInterval, maxInterval)),
+      nameID.str(),
+      to<std::string>(
+          "[", minInterval.count(), " , ", maxInterval.count(), "] ms"),
+      startDelay);
+}
+
+void FunctionScheduler::addFunctionGenericDistribution(
+    const std::function<void()>& cb,
+    const IntervalDistributionFunc& intervalFunc,
+    const std::string& nameID,
+    const std::string& intervalDescr,
+    milliseconds startDelay) {
+  if (!cb) {
+    throw std::invalid_argument(
+        "FunctionScheduler: Scheduled function must be set");
+  }
+  if (!intervalFunc) {
+    throw std::invalid_argument(
+        "FunctionScheduler: interval distribution function must be set");
   }
   if (startDelay < milliseconds::zero()) {
   }
   if (startDelay < milliseconds::zero()) {
-    throw std::invalid_argument("FunctionScheduler: "
-                                "start delay must be non-negative");
+    throw std::invalid_argument(
+        "FunctionScheduler: start delay must be non-negative");
   }
 
   std::lock_guard<std::mutex> l(mutex_);
   // check if the nameID is unique
   for (const auto& f : functions_) {
     if (f.isValid() && f.name == nameID) {
   }
 
   std::lock_guard<std::mutex> l(mutex_);
   // check if the nameID is unique
   for (const auto& f : functions_) {
     if (f.isValid() && f.name == nameID) {
-      throw std::invalid_argument(to<string>(
-            "FunctionScheduler: a function named \"", nameID,
-            "\" already exists"));
+      throw std::invalid_argument(
+          to<std::string>("FunctionScheduler: a function named \"",
+                          nameID,
+                          "\" already exists"));
     }
   }
   if (currentFunction_ && currentFunction_->name == nameID) {
     }
   }
   if (currentFunction_ && currentFunction_->name == nameID) {
-    throw std::invalid_argument(to<string>(
-          "FunctionScheduler: a function named \"", nameID,
-          "\" already exists"));
+    throw std::invalid_argument(to<std::string>(
+        "FunctionScheduler: a function named \"", nameID, "\" already exists"));
   }
 
   }
 
-  functions_.emplace_back(cb, interval, nameID.str(), startDelay,
-                          latencyDistr.isPoisson, latencyDistr.poissonMean);
+  functions_.emplace_back(cb, intervalFunc, nameID, intervalDescr, startDelay);
   if (running_) {
   if (running_) {
-    functions_.back().setNextRunTime(steady_clock::now() + startDelay);
+    functions_.back().resetNextRunTime(steady_clock::now());
     std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
     // Signal the running thread to wake up and see if it needs to change it's
     // current scheduling decision.
     std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
     // Signal the running thread to wake up and see if it needs to change it's
     // current scheduling decision.
@@ -140,11 +241,10 @@ bool FunctionScheduler::start() {
   // Reset the next run time. for all functions.
   // note: this is needed since one can shutdown() and start() again
   for (auto& f : functions_) {
   // Reset the next run time. for all functions.
   // note: this is needed since one can shutdown() and start() again
   for (auto& f : functions_) {
-    f.setNextRunTime(now + f.startDelay);
-    VLOG(1) << "   - func: "
-            << (f.name.empty() ? "(anon)" : f.name.c_str())
-            << ", period = " << f.timeInterval.count()
-            << "ms, delay = " << f.startDelay.count() << "ms";
+    f.resetNextRunTime(now);
+    VLOG(1) << "   - func: " << (f.name.empty() ? "(anon)" : f.name.c_str())
+            << ", period = " << f.intervalDescr
+            << ", delay = " << f.startDelay.count() << "ms";
   }
   std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
 
   }
   std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
 
@@ -217,19 +317,23 @@ void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
   // maintain the heap property on functions_ while mutex_ is unlocked.
   RepeatFunc func(std::move(functions_.back()));
   functions_.pop_back();
   // maintain the heap property on functions_ while mutex_ is unlocked.
   RepeatFunc func(std::move(functions_.back()));
   functions_.pop_back();
+  if (!func.cb) {
+    VLOG(5) << func.name << "function has been canceled while waiting";
+    return;
+  }
   currentFunction_ = &func;
 
   currentFunction_ = &func;
 
-  // Update the function's run time, and re-insert it into the heap.
+  // Update the function's next run time.
   if (steady_) {
     // This allows scheduler to catch up
   if (steady_) {
     // This allows scheduler to catch up
-    func.lastRunTime += func.timeInterval;
+    func.setNextRunTimeSteady();
   } else {
   } else {
-    // Note that we adjust lastRunTime to the current time where we started the
-    // function call, rather than the time when the function finishes.
+    // Note that we set nextRunTime based on the current time where we started
+    // the function call, rather than the time when the function finishes.
     // This ensures that we call the function once every time interval, as
     // opposed to waiting time interval seconds between calls.  (These can be
     // different if the function takes a significant amount of time to run.)
     // This ensures that we call the function once every time interval, as
     // opposed to waiting time interval seconds between calls.  (These can be
     // different if the function takes a significant amount of time to run.)
-    func.lastRunTime = now;
+    func.setNextRunTimeStrict(now);
   }
 
   // Release the lock while we invoke the user's function
   }
 
   // Release the lock while we invoke the user's function
@@ -259,9 +363,6 @@ void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
   // Re-insert the function into our functions_ heap.
   // We only maintain the heap property while running_ is set.  (running_ may
   // have been cleared while we were invoking the user's function.)
   // Re-insert the function into our functions_ heap.
   // We only maintain the heap property while running_ is set.  (running_ may
   // have been cleared while we were invoking the user's function.)
-  if (func.isPoissonDistr) {
-    func.setTimeIntervalPoissonDistr();
-  }
   functions_.push_back(std::move(func));
   if (running_) {
     std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
   functions_.push_back(std::move(func));
   if (running_) {
     std::push_heap(functions_.begin(), functions_.end(), fnCmp_);