From 13c58db990deb957dc178b48cf77c0c9aeb3876d Mon Sep 17 00:00:00 2001 From: Jimmy Saade Date: Mon, 14 Mar 2016 05:07:14 -0700 Subject: [PATCH] folly::FunctionScheduler: Adding capability to reset a function's timer Summary:Adding support for resetting a specified function's timer. "Resetting a function's timer" effectively means "canceling whatever next runs it would have had, and treating it as though it were just added". When `resetFunctionTimer` is called, the specified function's interval (timer) will be reset, and it will execute after its initially-specified `startDelay`. If the `startDelay` is zero, the function will execute immediately, and then be scheduled as before - once every `interval` milliseconds. Motivation: batch processing of updates, where both a size and time limit are in play. If the size limit is reached, it makes sense to reset the timer for the scheduled function. Differential Revision: D3045868 fb-gh-sync-id: a5ceb0069c04a77fdab16b61679987ee55484e89 shipit-source-id: a5ceb0069c04a77fdab16b61679987ee55484e89 --- folly/experimental/FunctionScheduler.cpp | 59 +++++++++++++++---- folly/experimental/FunctionScheduler.h | 13 ++++ .../test/FunctionSchedulerTest.cpp | 21 +++++++ 3 files changed, 82 insertions(+), 11 deletions(-) diff --git a/folly/experimental/FunctionScheduler.cpp b/folly/experimental/FunctionScheduler.cpp index f63c732a..d648ce26 100644 --- a/folly/experimental/FunctionScheduler.cpp +++ b/folly/experimental/FunctionScheduler.cpp @@ -157,7 +157,7 @@ void FunctionScheduler::addFunctionGenericDistribution( "FunctionScheduler: start delay must be non-negative"); } - std::lock_guard l(mutex_); + std::unique_lock l(mutex_); // check if the nameID is unique for (const auto& f : functions_) { if (f.isValid() && f.name == nameID) { @@ -172,21 +172,15 @@ void FunctionScheduler::addFunctionGenericDistribution( "FunctionScheduler: a function named \"", nameID, "\" already exists")); } - functions_.emplace_back(cb, intervalFunc, nameID, intervalDescr, startDelay); - if (running_) { - functions_.back().resetNextRunTime(steady_clock::now()); - std::push_heap(functions_.begin(), functions_.end(), fnCmp_); - // Signal the running thread to wake up and see if it needs to change it's - // current scheduling decision. - runningCondvar_.notify_one(); - } + addFunctionToHeap( + l, RepeatFunc(cb, intervalFunc, nameID, intervalDescr, startDelay)); } bool FunctionScheduler::cancelFunction(StringPiece nameID) { std::unique_lock l(mutex_); if (currentFunction_ && currentFunction_->name == nameID) { - // This function is currently being run. Clear currentFunction_ + // This function is currently being run. Clear currentFunction_ // The running thread will see this and won't reschedule the function. currentFunction_ = nullptr; return true; @@ -212,7 +206,7 @@ void FunctionScheduler::cancelFunction(const std::unique_lock& l, // heap. Unfortunately it isn't part of the standard API. // // For now we just leave the RepeatFunc in our heap, but mark it as unused. - // When it's nextTimeInterval comes up, the runner thread will pop it from + // When its nextTimeInterval comes up, the runner thread will pop it from // the heap and simply throw it away. it->cancel(); } else { @@ -227,6 +221,32 @@ void FunctionScheduler::cancelAllFunctions() { functions_.clear(); } +bool FunctionScheduler::resetFunctionTimer(StringPiece nameID) { + std::unique_lock l(mutex_); + if (currentFunction_ && currentFunction_->name == nameID) { + RepeatFunc* funcPtrCopy = currentFunction_; + // This function is currently being run. Clear currentFunction_ + // to avoid rescheduling it, and add the function again to honor the + // startDelay. + currentFunction_ = nullptr; + addFunctionToHeap(l, std::move(*funcPtrCopy)); + return true; + } + + // Since __adjust_heap() isn't a part of the standard API, there's no way to + // fix the heap ordering if we adjust the key (nextRunTime) for the existing + // RepeatFunc. Instead, we just cancel it and add an identical object. + for (auto it = functions_.begin(); it != functions_.end(); ++it) { + if (it->isValid() && it->name == nameID) { + RepeatFunc funcCopy(std::move(*it)); + cancelFunction(l, it); + addFunctionToHeap(l, std::move(funcCopy)); + return true; + } + } + return false; +} + bool FunctionScheduler::start() { std::unique_lock l(mutex_); if (running_) { @@ -369,6 +389,23 @@ void FunctionScheduler::runOneFunction(std::unique_lock& lock, } } +void FunctionScheduler::addFunctionToHeap( + const std::unique_lock& lock, + RepeatFunc&& func) { + // This function should only be called with mutex_ already locked. + DCHECK(lock.mutex() == &mutex_); + DCHECK(lock.owns_lock()); + + functions_.emplace_back(std::move(func)); + if (running_) { + functions_.back().resetNextRunTime(steady_clock::now()); + std::push_heap(functions_.begin(), functions_.end(), fnCmp_); + // Signal the running thread to wake up and see if it needs to change + // its current scheduling decision. + runningCondvar_.notify_one(); + } +} + void FunctionScheduler::setThreadName(StringPiece threadName) { std::unique_lock l(mutex_); threadName_ = threadName.str(); diff --git a/folly/experimental/FunctionScheduler.h b/folly/experimental/FunctionScheduler.h index 0f0a848b..08976f07 100644 --- a/folly/experimental/FunctionScheduler.h +++ b/folly/experimental/FunctionScheduler.h @@ -155,6 +155,17 @@ class FunctionScheduler { */ void cancelAllFunctions(); + /** + * Resets the specified function's timer. + * When resetFunctionTimer is called, the specified function's timer will + * be reset with the same parameters it was passed initially, including + * its startDelay. If the startDelay was 0, the function will be invoked + * immediately. + * + * Returns false if no function exists with the specified name. + */ + bool resetFunctionTimer(StringPiece nameID); + /** * Starts the scheduler. * @@ -225,6 +236,8 @@ class FunctionScheduler { std::chrono::steady_clock::time_point now); void cancelFunction(const std::unique_lock& lock, FunctionHeap::iterator it); + void addFunctionToHeap(const std::unique_lock& lock, + RepeatFunc&& func); std::thread thread_; diff --git a/folly/experimental/test/FunctionSchedulerTest.cpp b/folly/experimental/test/FunctionSchedulerTest.cpp index d8c0fd19..8e0ea8e1 100644 --- a/folly/experimental/test/FunctionSchedulerTest.cpp +++ b/folly/experimental/test/FunctionSchedulerTest.cpp @@ -211,6 +211,27 @@ TEST(FunctionScheduler, ShutdownStart) { EXPECT_EQ(6, total); } +TEST(FunctionScheduler, ResetFunc) { + int total = 0; + FunctionScheduler fs; + fs.addFunction([&] { total += 2; }, testInterval(3), "add2"); + fs.addFunction([&] { total += 3; }, testInterval(3), "add3"); + fs.start(); + delay(1); + EXPECT_EQ(5, total); + EXPECT_FALSE(fs.resetFunctionTimer("NON_EXISTING")); + EXPECT_TRUE(fs.resetFunctionTimer("add2")); + delay(1); + // t2: after the reset, add2 should have been invoked immediately + EXPECT_EQ(7, total); + usleep(150000); + // t3.5: add3 should have been invoked. add2 should not + EXPECT_EQ(10, total); + delay(1); + // t4.5: add2 should have been invoked once more (it was reset at t1) + EXPECT_EQ(12, total); +} + TEST(FunctionScheduler, AddInvalid) { int total = 0; FunctionScheduler fs; -- 2.34.1