folly::FunctionScheduler: Adding capability to reset a function's timer
authorJimmy Saade <jimmyjs@fb.com>
Mon, 14 Mar 2016 12:07:14 +0000 (05:07 -0700)
committerFacebook Github Bot 7 <facebook-github-bot-7-bot@fb.com>
Mon, 14 Mar 2016 12:20:21 +0000 (05:20 -0700)
Summary:Adding support for resetting a specified function's timer.

"Resetting a function's timer" effectively means "canceling whatever next runs it would have had, and treating it as though it were just added".
When `resetFunctionTimer` is called, the specified function's interval (timer) will be reset, and it will execute after its initially-specified `startDelay`. If the `startDelay` is zero, the function will execute immediately, and then be scheduled as before - once every `interval` milliseconds.

Motivation: batch processing of updates, where both a size and time limit are in play. If the size limit is reached, it makes sense to reset the timer for the scheduled function.

Differential Revision: D3045868

fb-gh-sync-id: a5ceb0069c04a77fdab16b61679987ee55484e89
shipit-source-id: a5ceb0069c04a77fdab16b61679987ee55484e89

folly/experimental/FunctionScheduler.cpp
folly/experimental/FunctionScheduler.h
folly/experimental/test/FunctionSchedulerTest.cpp

index f63c732ac9571da7e2deb7c75aedd8bcd7cafd77..d648ce2645181fc1816e25d588f5eaf4f52f9777 100644 (file)
@@ -157,7 +157,7 @@ void FunctionScheduler::addFunctionGenericDistribution(
         "FunctionScheduler: start delay must be non-negative");
   }
 
-  std::lock_guard<std::mutex> l(mutex_);
+  std::unique_lock<std::mutex> l(mutex_);
   // check if the nameID is unique
   for (const auto& f : functions_) {
     if (f.isValid() && f.name == nameID) {
@@ -172,21 +172,15 @@ void FunctionScheduler::addFunctionGenericDistribution(
         "FunctionScheduler: a function named \"", nameID, "\" already exists"));
   }
 
-  functions_.emplace_back(cb, intervalFunc, nameID, intervalDescr, startDelay);
-  if (running_) {
-    functions_.back().resetNextRunTime(steady_clock::now());
-    std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
-    // Signal the running thread to wake up and see if it needs to change it's
-    // current scheduling decision.
-    runningCondvar_.notify_one();
-  }
+  addFunctionToHeap(
+      l, RepeatFunc(cb, intervalFunc, nameID, intervalDescr, startDelay));
 }
 
 bool FunctionScheduler::cancelFunction(StringPiece nameID) {
   std::unique_lock<std::mutex> l(mutex_);
 
   if (currentFunction_ && currentFunction_->name == nameID) {
-    // This function is currently being run.  Clear currentFunction_
+    // This function is currently being run. Clear currentFunction_
     // The running thread will see this and won't reschedule the function.
     currentFunction_ = nullptr;
     return true;
@@ -212,7 +206,7 @@ void FunctionScheduler::cancelFunction(const std::unique_lock<std::mutex>& l,
     // heap.  Unfortunately it isn't part of the standard API.
     //
     // For now we just leave the RepeatFunc in our heap, but mark it as unused.
-    // When it's nextTimeInterval comes up, the runner thread will pop it from
+    // When its nextTimeInterval comes up, the runner thread will pop it from
     // the heap and simply throw it away.
     it->cancel();
   } else {
@@ -227,6 +221,32 @@ void FunctionScheduler::cancelAllFunctions() {
   functions_.clear();
 }
 
+bool FunctionScheduler::resetFunctionTimer(StringPiece nameID) {
+  std::unique_lock<std::mutex> l(mutex_);
+  if (currentFunction_ && currentFunction_->name == nameID) {
+    RepeatFunc* funcPtrCopy = currentFunction_;
+    // This function is currently being run. Clear currentFunction_
+    // to avoid rescheduling it, and add the function again to honor the
+    // startDelay.
+    currentFunction_ = nullptr;
+    addFunctionToHeap(l, std::move(*funcPtrCopy));
+    return true;
+  }
+
+  // Since __adjust_heap() isn't a part of the standard API, there's no way to
+  // fix the heap ordering if we adjust the key (nextRunTime) for the existing
+  // RepeatFunc. Instead, we just cancel it and add an identical object.
+  for (auto it = functions_.begin(); it != functions_.end(); ++it) {
+    if (it->isValid() && it->name == nameID) {
+      RepeatFunc funcCopy(std::move(*it));
+      cancelFunction(l, it);
+      addFunctionToHeap(l, std::move(funcCopy));
+      return true;
+    }
+  }
+  return false;
+}
+
 bool FunctionScheduler::start() {
   std::unique_lock<std::mutex> l(mutex_);
   if (running_) {
@@ -369,6 +389,23 @@ void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
   }
 }
 
+void FunctionScheduler::addFunctionToHeap(
+    const std::unique_lock<std::mutex>& lock,
+    RepeatFunc&& func) {
+  // This function should only be called with mutex_ already locked.
+  DCHECK(lock.mutex() == &mutex_);
+  DCHECK(lock.owns_lock());
+
+  functions_.emplace_back(std::move(func));
+  if (running_) {
+    functions_.back().resetNextRunTime(steady_clock::now());
+    std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
+    // Signal the running thread to wake up and see if it needs to change
+    // its current scheduling decision.
+    runningCondvar_.notify_one();
+  }
+}
+
 void FunctionScheduler::setThreadName(StringPiece threadName) {
   std::unique_lock<std::mutex> l(mutex_);
   threadName_ = threadName.str();
index 0f0a848b219867f1af9898b4bebe2e91dca6f763..08976f07b58e319732368baa06e37228b3301830 100644 (file)
@@ -155,6 +155,17 @@ class FunctionScheduler {
    */
   void cancelAllFunctions();
 
+  /**
+   * Resets the specified function's timer.
+   * When resetFunctionTimer is called, the specified function's timer will
+   * be reset with the same parameters it was passed initially, including
+   * its startDelay. If the startDelay was 0, the function will be invoked
+   * immediately.
+   *
+   * Returns false if no function exists with the specified name.
+   */
+  bool resetFunctionTimer(StringPiece nameID);
+
   /**
    * Starts the scheduler.
    *
@@ -225,6 +236,8 @@ class FunctionScheduler {
                       std::chrono::steady_clock::time_point now);
   void cancelFunction(const std::unique_lock<std::mutex>& lock,
                       FunctionHeap::iterator it);
+  void addFunctionToHeap(const std::unique_lock<std::mutex>& lock,
+                         RepeatFunc&& func);
 
   std::thread thread_;
 
index d8c0fd19336fadfd0cbe4ce2bedd702f23841618..8e0ea8e1b6fddb3186254155c3f46012bec913c4 100644 (file)
@@ -211,6 +211,27 @@ TEST(FunctionScheduler, ShutdownStart) {
   EXPECT_EQ(6, total);
 }
 
+TEST(FunctionScheduler, ResetFunc) {
+  int total = 0;
+  FunctionScheduler fs;
+  fs.addFunction([&] { total += 2; }, testInterval(3), "add2");
+  fs.addFunction([&] { total += 3; }, testInterval(3), "add3");
+  fs.start();
+  delay(1);
+  EXPECT_EQ(5, total);
+  EXPECT_FALSE(fs.resetFunctionTimer("NON_EXISTING"));
+  EXPECT_TRUE(fs.resetFunctionTimer("add2"));
+  delay(1);
+  // t2: after the reset, add2 should have been invoked immediately
+  EXPECT_EQ(7, total);
+  usleep(150000);
+  // t3.5: add3 should have been invoked. add2 should not
+  EXPECT_EQ(10, total);
+  delay(1);
+  // t4.5: add2 should have been invoked once more (it was reset at t1)
+  EXPECT_EQ(12, total);
+}
+
 TEST(FunctionScheduler, AddInvalid) {
   int total = 0;
   FunctionScheduler fs;