folly::FunctionScheduler: Adding capability to reset a function's timer
[folly.git] / folly / experimental / FunctionScheduler.cpp
index f63c732ac9571da7e2deb7c75aedd8bcd7cafd77..d648ce2645181fc1816e25d588f5eaf4f52f9777 100644 (file)
@@ -157,7 +157,7 @@ void FunctionScheduler::addFunctionGenericDistribution(
         "FunctionScheduler: start delay must be non-negative");
   }
 
-  std::lock_guard<std::mutex> l(mutex_);
+  std::unique_lock<std::mutex> l(mutex_);
   // check if the nameID is unique
   for (const auto& f : functions_) {
     if (f.isValid() && f.name == nameID) {
@@ -172,21 +172,15 @@ void FunctionScheduler::addFunctionGenericDistribution(
         "FunctionScheduler: a function named \"", nameID, "\" already exists"));
   }
 
-  functions_.emplace_back(cb, intervalFunc, nameID, intervalDescr, startDelay);
-  if (running_) {
-    functions_.back().resetNextRunTime(steady_clock::now());
-    std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
-    // Signal the running thread to wake up and see if it needs to change it's
-    // current scheduling decision.
-    runningCondvar_.notify_one();
-  }
+  addFunctionToHeap(
+      l, RepeatFunc(cb, intervalFunc, nameID, intervalDescr, startDelay));
 }
 
 bool FunctionScheduler::cancelFunction(StringPiece nameID) {
   std::unique_lock<std::mutex> l(mutex_);
 
   if (currentFunction_ && currentFunction_->name == nameID) {
-    // This function is currently being run.  Clear currentFunction_
+    // This function is currently being run. Clear currentFunction_
     // The running thread will see this and won't reschedule the function.
     currentFunction_ = nullptr;
     return true;
@@ -212,7 +206,7 @@ void FunctionScheduler::cancelFunction(const std::unique_lock<std::mutex>& l,
     // heap.  Unfortunately it isn't part of the standard API.
     //
     // For now we just leave the RepeatFunc in our heap, but mark it as unused.
-    // When it's nextTimeInterval comes up, the runner thread will pop it from
+    // When its nextTimeInterval comes up, the runner thread will pop it from
     // the heap and simply throw it away.
     it->cancel();
   } else {
@@ -227,6 +221,32 @@ void FunctionScheduler::cancelAllFunctions() {
   functions_.clear();
 }
 
+bool FunctionScheduler::resetFunctionTimer(StringPiece nameID) {
+  std::unique_lock<std::mutex> l(mutex_);
+  if (currentFunction_ && currentFunction_->name == nameID) {
+    RepeatFunc* funcPtrCopy = currentFunction_;
+    // This function is currently being run. Clear currentFunction_
+    // to avoid rescheduling it, and add the function again to honor the
+    // startDelay.
+    currentFunction_ = nullptr;
+    addFunctionToHeap(l, std::move(*funcPtrCopy));
+    return true;
+  }
+
+  // Since __adjust_heap() isn't a part of the standard API, there's no way to
+  // fix the heap ordering if we adjust the key (nextRunTime) for the existing
+  // RepeatFunc. Instead, we just cancel it and add an identical object.
+  for (auto it = functions_.begin(); it != functions_.end(); ++it) {
+    if (it->isValid() && it->name == nameID) {
+      RepeatFunc funcCopy(std::move(*it));
+      cancelFunction(l, it);
+      addFunctionToHeap(l, std::move(funcCopy));
+      return true;
+    }
+  }
+  return false;
+}
+
 bool FunctionScheduler::start() {
   std::unique_lock<std::mutex> l(mutex_);
   if (running_) {
@@ -369,6 +389,23 @@ void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
   }
 }
 
+void FunctionScheduler::addFunctionToHeap(
+    const std::unique_lock<std::mutex>& lock,
+    RepeatFunc&& func) {
+  // This function should only be called with mutex_ already locked.
+  DCHECK(lock.mutex() == &mutex_);
+  DCHECK(lock.owns_lock());
+
+  functions_.emplace_back(std::move(func));
+  if (running_) {
+    functions_.back().resetNextRunTime(steady_clock::now());
+    std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
+    // Signal the running thread to wake up and see if it needs to change
+    // its current scheduling decision.
+    runningCondvar_.notify_one();
+  }
+}
+
 void FunctionScheduler::setThreadName(StringPiece threadName) {
   std::unique_lock<std::mutex> l(mutex_);
   threadName_ = threadName.str();