Consistently have the namespace closing comment
[folly.git] / folly / experimental / FunctionScheduler.cpp
index 9a12256ffb84020716336bcdcdb2cf02cd3c4962..ce58c43c69648ecbaea4bdda8247df87681b3fb5 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2015 Facebook, Inc.
+ * Copyright 2017 Facebook, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  */
 
 #include <folly/experimental/FunctionScheduler.h>
-#include <folly/ThreadName.h>
+
+#include <random>
+
 #include <folly/Conv.h>
+#include <folly/Random.h>
 #include <folly/String.h>
+#include <folly/system/ThreadName.h>
 
-#ifdef _POSIX_MONOTONIC_CLOCK
-#define FOLLY_TIME_MONOTONIC_CLOCK CLOCK_MONOTONIC
-#else
-#define FOLLY_TIME_MONOTONIC_CLOCK CLOCK_REALTIME
-#endif
-
-using namespace std;
-using std::chrono::seconds;
 using std::chrono::milliseconds;
+using std::chrono::steady_clock;
+
+namespace folly {
+
+namespace {
+
+struct ConstIntervalFunctor {
+  const milliseconds constInterval;
 
-static milliseconds nowInMS() {
-  struct timespec ts /*= void*/;
-  if (clock_gettime(FOLLY_TIME_MONOTONIC_CLOCK, &ts)) {
-    // Only possible failures are EFAULT or EINVAL, both practically
-    // impossible. But an assert can't hurt.
-    assert(false);
+  explicit ConstIntervalFunctor(milliseconds interval)
+      : constInterval(interval) {
+    if (interval < milliseconds::zero()) {
+      throw std::invalid_argument(
+          "FunctionScheduler: "
+          "time interval must be non-negative");
+    }
   }
-  return milliseconds(
-    static_cast<int64_t>(ts.tv_sec * 1000.0 + ts.tv_nsec / 1000000.0 + 0.5));
-}
 
-namespace folly {
+  milliseconds operator()() const { return constInterval; }
+};
 
-FunctionScheduler::FunctionScheduler() {
-}
+struct PoissonDistributionFunctor {
+  std::default_random_engine generator;
+  std::poisson_distribution<int> poissonRandom;
+
+  explicit PoissonDistributionFunctor(double meanPoissonMs)
+      : poissonRandom(meanPoissonMs) {
+    if (meanPoissonMs < 0.0) {
+      throw std::invalid_argument(
+          "FunctionScheduler: "
+          "Poisson mean interval must be non-negative");
+    }
+  }
+
+  milliseconds operator()() { return milliseconds(poissonRandom(generator)); }
+};
+
+struct UniformDistributionFunctor {
+  std::default_random_engine generator;
+  std::uniform_int_distribution<milliseconds::rep> dist;
+
+  UniformDistributionFunctor(milliseconds minInterval, milliseconds maxInterval)
+      : generator(Random::rand32()),
+        dist(minInterval.count(), maxInterval.count()) {
+    if (minInterval > maxInterval) {
+      throw std::invalid_argument(
+          "FunctionScheduler: "
+          "min time interval must be less or equal than max interval");
+    }
+    if (minInterval < milliseconds::zero()) {
+      throw std::invalid_argument(
+          "FunctionScheduler: "
+          "time interval must be non-negative");
+    }
+  }
+
+  milliseconds operator()() { return milliseconds(dist(generator)); }
+};
+
+} // namespace
+
+FunctionScheduler::FunctionScheduler() {}
 
 FunctionScheduler::~FunctionScheduler() {
   // make sure to stop the thread (if running)
   shutdown();
 }
 
-void FunctionScheduler::addFunction(const std::function<void()>& cb,
+void FunctionScheduler::addFunction(Function<void()>&& cb,
                                     milliseconds interval,
                                     StringPiece nameID,
                                     milliseconds startDelay) {
-  LatencyDistribution latencyDistr(false, 0.0);
-  addFunctionInternal(cb, interval,
-                      latencyDistr, nameID, startDelay);
+  addFunctionInternal(
+      std::move(cb),
+      ConstIntervalFunctor(interval),
+      nameID.str(),
+      to<std::string>(interval.count(), "ms"),
+      startDelay,
+      false /*runOnce*/);
 }
 
-void FunctionScheduler::addFunctionInternal(const std::function<void()>& cb,
+void FunctionScheduler::addFunction(Function<void()>&& cb,
                                     milliseconds interval,
                                     const LatencyDistribution& latencyDistr,
                                     StringPiece nameID,
                                     milliseconds startDelay) {
-  if (interval < milliseconds::zero()) {
-    throw std::invalid_argument("FunctionScheduler: "
-                                "time interval must be non-negative");
+  if (latencyDistr.isPoisson) {
+    addFunctionInternal(
+        std::move(cb),
+        PoissonDistributionFunctor(latencyDistr.poissonMean),
+        nameID.str(),
+        to<std::string>(latencyDistr.poissonMean, "ms (Poisson mean)"),
+        startDelay,
+        false /*runOnce*/);
+  } else {
+    addFunction(std::move(cb), interval, nameID, startDelay);
+  }
+}
+
+void FunctionScheduler::addFunctionOnce(
+    Function<void()>&& cb,
+    StringPiece nameID,
+    milliseconds startDelay) {
+  addFunctionInternal(
+      std::move(cb),
+      ConstIntervalFunctor(milliseconds::zero()),
+      nameID.str(),
+      "once",
+      startDelay,
+      true /*runOnce*/);
+}
+
+void FunctionScheduler::addFunctionUniformDistribution(
+    Function<void()>&& cb,
+    milliseconds minInterval,
+    milliseconds maxInterval,
+    StringPiece nameID,
+    milliseconds startDelay) {
+  addFunctionInternal(
+      std::move(cb),
+      UniformDistributionFunctor(minInterval, maxInterval),
+      nameID.str(),
+      to<std::string>(
+          "[", minInterval.count(), " , ", maxInterval.count(), "] ms"),
+      startDelay,
+      false /*runOnce*/);
+}
+
+void FunctionScheduler::addFunctionGenericDistribution(
+    Function<void()>&& cb,
+    IntervalDistributionFunc&& intervalFunc,
+    const std::string& nameID,
+    const std::string& intervalDescr,
+    milliseconds startDelay) {
+  addFunctionInternal(
+      std::move(cb),
+      std::move(intervalFunc),
+      nameID,
+      intervalDescr,
+      startDelay,
+      false /*runOnce*/);
+}
+
+void FunctionScheduler::addFunctionInternal(
+    Function<void()>&& cb,
+    IntervalDistributionFunc&& intervalFunc,
+    const std::string& nameID,
+    const std::string& intervalDescr,
+    milliseconds startDelay,
+    bool runOnce) {
+  if (!cb) {
+    throw std::invalid_argument(
+        "FunctionScheduler: Scheduled function must be set");
+  }
+  if (!intervalFunc) {
+    throw std::invalid_argument(
+        "FunctionScheduler: interval distribution function must be set");
   }
   if (startDelay < milliseconds::zero()) {
-    throw std::invalid_argument("FunctionScheduler: "
-                                "start delay must be non-negative");
+    throw std::invalid_argument(
+        "FunctionScheduler: start delay must be non-negative");
   }
 
-  std::lock_guard<std::mutex> l(mutex_);
+  std::unique_lock<std::mutex> l(mutex_);
+  auto it = functionsMap_.find(nameID);
   // check if the nameID is unique
-  for (const auto& f : functions_) {
-    if (f.isValid() && f.name == nameID) {
-      throw std::invalid_argument(to<string>(
-            "FunctionScheduler: a function named \"", nameID,
-            "\" already exists"));
-    }
+  if (it != functionsMap_.end() && it->second->isValid()) {
+    throw std::invalid_argument(to<std::string>(
+        "FunctionScheduler: a function named \"", nameID, "\" already exists"));
   }
+
   if (currentFunction_ && currentFunction_->name == nameID) {
-    throw std::invalid_argument(to<string>(
-          "FunctionScheduler: a function named \"", nameID,
-          "\" already exists"));
+    throw std::invalid_argument(to<std::string>(
+        "FunctionScheduler: a function named \"", nameID, "\" already exists"));
   }
 
-  functions_.emplace_back(cb, interval, nameID.str(), startDelay,
-                          latencyDistr.isPoisson, latencyDistr.poissonMean);
-  if (running_) {
-    functions_.back().setNextRunTime(nowInMS() + startDelay);
-    std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
-    // Signal the running thread to wake up and see if it needs to change it's
-    // current scheduling decision.
-    runningCondvar_.notify_one();
+  addFunctionToHeap(
+      l,
+      std::make_unique<RepeatFunc>(
+          std::move(cb),
+          std::move(intervalFunc),
+          nameID,
+          intervalDescr,
+          startDelay,
+          runOnce));
+}
+
+bool FunctionScheduler::cancelFunctionWithLock(
+    std::unique_lock<std::mutex>& lock,
+    StringPiece nameID) {
+  CHECK_EQ(lock.owns_lock(), true);
+  if (currentFunction_ && currentFunction_->name == nameID) {
+    functionsMap_.erase(currentFunction_->name);
+    // This function is currently being run. Clear currentFunction_
+    // The running thread will see this and won't reschedule the function.
+    currentFunction_ = nullptr;
+    cancellingCurrentFunction_ = true;
+    return true;
   }
+  return false;
 }
 
 bool FunctionScheduler::cancelFunction(StringPiece nameID) {
   std::unique_lock<std::mutex> l(mutex_);
+  if (cancelFunctionWithLock(l, nameID)) {
+    return true;
+  }
+  auto it = functionsMap_.find(nameID);
+  if (it != functionsMap_.end() && it->second->isValid()) {
+    cancelFunction(l, it->second);
+    return true;
+  }
 
-  if (currentFunction_ && currentFunction_->name == nameID) {
-    // This function is currently being run.  Clear currentFunction_
-    // The running thread will see this and won't reschedule the function.
-    currentFunction_ = nullptr;
+  return false;
+}
+
+bool FunctionScheduler::cancelFunctionAndWait(StringPiece nameID) {
+  std::unique_lock<std::mutex> l(mutex_);
+
+  if (cancelFunctionWithLock(l, nameID)) {
+    runningCondvar_.wait(l, [this]() { return !cancellingCurrentFunction_; });
     return true;
   }
 
-  for (auto it = functions_.begin(); it != functions_.end(); ++it) {
-    if (it->isValid() && it->name == nameID) {
-      cancelFunction(l, it);
+  auto it = functionsMap_.find(nameID);
+    if (it != functionsMap_.end() && it->second->isValid()) {
+      cancelFunction(l, it->second);
       return true;
     }
-  }
   return false;
 }
 
 void FunctionScheduler::cancelFunction(const std::unique_lock<std::mutex>& l,
-                                       FunctionHeap::iterator it) {
+                                      RepeatFunc* it) {
   // This function should only be called with mutex_ already locked.
   DCHECK(l.mutex() == &mutex_);
   DCHECK(l.owns_lock());
+  functionsMap_.erase(it->name);
+  it->cancel();
+}
 
-  if (running_) {
-    // Internally gcc has an __adjust_heap() function to fill in a hole in the
-    // heap.  Unfortunately it isn't part of the standard API.
-    //
-    // For now we just leave the RepeatFunc in our heap, but mark it as unused.
-    // When it's nextTimeInterval comes up, the runner thread will pop it from
-    // the heap and simply throw it away.
-    it->cancel();
-  } else {
-    // We're not running, so functions_ doesn't need to be maintained in heap
-    // order.
-    functions_.erase(it);
+bool FunctionScheduler::cancelAllFunctionsWithLock(
+    std::unique_lock<std::mutex>& lock) {
+  CHECK_EQ(lock.owns_lock(), true);
+  functions_.clear();
+  functionsMap_.clear();
+  if (currentFunction_) {
+    cancellingCurrentFunction_ = true;
   }
+  currentFunction_ = nullptr;
+  return cancellingCurrentFunction_;
 }
 
 void FunctionScheduler::cancelAllFunctions() {
   std::unique_lock<std::mutex> l(mutex_);
-  functions_.clear();
+  cancelAllFunctionsWithLock(l);
+}
+
+void FunctionScheduler::cancelAllFunctionsAndWait() {
+  std::unique_lock<std::mutex> l(mutex_);
+  if (cancelAllFunctionsWithLock(l)) {
+    runningCondvar_.wait(l, [this]() { return !cancellingCurrentFunction_; });
+  }
+}
+
+bool FunctionScheduler::resetFunctionTimer(StringPiece nameID) {
+  std::unique_lock<std::mutex> l(mutex_);
+  if (currentFunction_ && currentFunction_->name == nameID) {
+    // TODO: This moves out of RepeatFunc object while folly:Function can
+    // potentially be executed. This might be unsafe.
+    auto funcPtrCopy = std::make_unique<RepeatFunc>(std::move(*currentFunction_));
+    // This function is currently being run. Clear currentFunction_
+    // to avoid rescheduling it, and add the function again to honor the
+    // startDelay.
+    currentFunction_ = nullptr;
+    addFunctionToHeap(l, std::move(funcPtrCopy));
+    return true;
+  }
+
+  // Since __adjust_heap() isn't a part of the standard API, there's no way to
+  // fix the heap ordering if we adjust the key (nextRunTime) for the existing
+  // RepeatFunc. Instead, we just cancel it and add an identical object.
+  auto it = functionsMap_.find(nameID);
+
+  if (it != functionsMap_.end() && it->second->isValid()) {
+    auto funcCopy = std::make_unique<RepeatFunc>(std::move(*(it->second)));
+    it->second->cancel();
+    // This will take care of making sure that functionsMap_[it->first] =
+    // funcCopy.
+    addFunctionToHeap(l, std::move(funcCopy));
+    return true;
+  }
+  return false;
 }
 
 bool FunctionScheduler::start() {
@@ -150,37 +325,37 @@ bool FunctionScheduler::start() {
     return false;
   }
 
-  running_ = true;
-
   VLOG(1) << "Starting FunctionScheduler with " << functions_.size()
           << " functions.";
-  milliseconds now(nowInMS());
+  auto now = steady_clock::now();
   // Reset the next run time. for all functions.
   // note: this is needed since one can shutdown() and start() again
-  for (auto& f : functions_) {
-    f.setNextRunTime(now + f.startDelay);
-    VLOG(1) << "   - func: "
-            << (f.name.empty() ? "(anon)" : f.name.c_str())
-            << ", period = " << f.timeInterval.count()
-            << "ms, delay = " << f.startDelay.count() << "ms";
+  for (const auto& f : functions_) {
+    f->resetNextRunTime(now);
+    VLOG(1) << "   - func: " << (f->name.empty() ? "(anon)" : f->name.c_str())
+            << ", period = " << f->intervalDescr
+            << ", delay = " << f->startDelay.count() << "ms";
   }
   std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
 
   thread_ = std::thread([&] { this->run(); });
+  running_ = true;
+
   return true;
 }
 
-void FunctionScheduler::shutdown() {
+bool FunctionScheduler::shutdown() {
   {
     std::lock_guard<std::mutex> g(mutex_);
     if (!running_) {
-      return;
+      return false;
     }
 
     running_ = false;
     runningCondvar_.notify_one();
   }
   thread_.join();
+  return true;
 }
 
 void FunctionScheduler::run() {
@@ -198,22 +373,23 @@ void FunctionScheduler::run() {
       continue;
     }
 
-    milliseconds now(nowInMS());
+    auto now = steady_clock::now();
 
     // Move the next function to run to the end of functions_
     std::pop_heap(functions_.begin(), functions_.end(), fnCmp_);
 
     // Check to see if the function was cancelled.
     // If so, just remove it and continue around the loop.
-    if (!functions_.back().isValid()) {
+    if (!functions_.back()->isValid()) {
       functions_.pop_back();
       continue;
     }
 
-    auto sleepTime = functions_.back().getNextRunTime() - now;
+    auto sleepTime = functions_.back()->getNextRunTime() - now;
     if (sleepTime < milliseconds::zero()) {
       // We need to run this function now
       runOneFunction(lock, now);
+      runningCondvar_.notify_all();
     } else {
       // Re-add the function to the heap, and wait until we actually
       // need to run it.
@@ -224,7 +400,7 @@ void FunctionScheduler::run() {
 }
 
 void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
-                                       std::chrono::milliseconds now) {
+                                       steady_clock::time_point now) {
   DCHECK(lock.mutex() == &mutex_);
   DCHECK(lock.owns_lock());
 
@@ -233,21 +409,24 @@ void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
   // Fully remove it from functions_ now.
   // We need to release mutex_ while we invoke this function, and we need to
   // maintain the heap property on functions_ while mutex_ is unlocked.
-  RepeatFunc func(std::move(functions_.back()));
+  auto func = std::move(functions_.back());
   functions_.pop_back();
-  currentFunction_ = &func;
-
-  // Update the function's run time, and re-insert it into the heap.
+  if (!func->cb) {
+    VLOG(5) << func->name << "function has been canceled while waiting";
+    return;
+  }
+  currentFunction_ = func.get();
+  // Update the function's next run time.
   if (steady_) {
     // This allows scheduler to catch up
-    func.lastRunTime += func.timeInterval;
+    func->setNextRunTimeSteady();
   } else {
-    // Note that we adjust lastRunTime to the current time where we started the
-    // function call, rather than the time when the function finishes.
+    // Note that we set nextRunTime based on the current time where we started
+    // the function call, rather than the time when the function finishes.
     // This ensures that we call the function once every time interval, as
     // opposed to waiting time interval seconds between calls.  (These can be
     // different if the function takes a significant amount of time to run.)
-    func.lastRunTime = now;
+    func->setNextRunTimeStrict(now);
   }
 
   // Release the lock while we invoke the user's function
@@ -255,11 +434,11 @@ void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
 
   // Invoke the function
   try {
-    VLOG(5) << "Now running " << func.name;
-    func.cb();
+    VLOG(5) << "Now running " << func->name;
+    func->cb();
   } catch (const std::exception& ex) {
     LOG(ERROR) << "Error running the scheduled function <"
-      << func.name << ">: " << exceptionStr(ex);
+      << func->name << ">: " << exceptionStr(ex);
   }
 
   // Re-acquire the lock
@@ -268,21 +447,44 @@ void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
   if (!currentFunction_) {
     // The function was cancelled while we were running it.
     // We shouldn't reschedule it;
+    cancellingCurrentFunction_ = false;
+    return;
+  }
+  if (currentFunction_->runOnce) {
+    // Don't reschedule if the function only needed to run once.
+    functionsMap_.erase(currentFunction_->name);
+    currentFunction_ = nullptr;
     return;
   }
-  // Clear currentFunction_
-  CHECK_EQ(currentFunction_, &func);
-  currentFunction_ = nullptr;
 
   // Re-insert the function into our functions_ heap.
   // We only maintain the heap property while running_ is set.  (running_ may
   // have been cleared while we were invoking the user's function.)
-  if (func.isPoissonDistr) {
-    func.setTimeIntervalPoissonDistr();
+  functions_.push_back(std::move(func));
+
+  // Clear currentFunction_
+  currentFunction_ = nullptr;
+
+  if (running_) {
+    std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
   }
+}
+
+void FunctionScheduler::addFunctionToHeap(
+    const std::unique_lock<std::mutex>& lock,
+    std::unique_ptr<RepeatFunc> func) {
+  // This function should only be called with mutex_ already locked.
+  DCHECK(lock.mutex() == &mutex_);
+  DCHECK(lock.owns_lock());
+
   functions_.push_back(std::move(func));
+  functionsMap_[functions_.back()->name] = functions_.back().get();
   if (running_) {
+    functions_.back()->resetNextRunTime(steady_clock::now());
     std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
+    // Signal the running thread to wake up and see if it needs to change
+    // its current scheduling decision.
+    runningCondvar_.notify_one();
   }
 }
 
@@ -291,4 +493,4 @@ void FunctionScheduler::setThreadName(StringPiece threadName) {
   threadName_ = threadName.str();
 }
 
-}
+} // namespace folly