From b35c3434677866f744d679f09a8429f86aba12a2 Mon Sep 17 00:00:00 2001 From: Andrei Alexandrescu Date: Thu, 12 Feb 2015 13:17:31 -0800 Subject: [PATCH] Migrate FunctionScheduler from common/concurrency/ to folly/experimental/ Summary: This are the open-source-related additions. A separate diff will replace the current implementation in common/concurrency/ to use this one. Test Plan: fbmake runtests Reviewed By: simpkins@fb.com Subscribers: trunkagent, net-systems@, folly-diffs@, yfeldblum FB internal diff: D1845525 Signature: t1:1845525:1424207291:d30e3c5e85222527c2aff39c1250aa1e41b9a2cf --- folly/experimental/FunctionScheduler.cpp | 296 +++++++++++++++++++++ folly/experimental/FunctionScheduler.h | 217 +++++++++++++++ folly/test/FunctionSchedulerTest.cpp | 321 +++++++++++++++++++++++ 3 files changed, 834 insertions(+) create mode 100644 folly/experimental/FunctionScheduler.cpp create mode 100644 folly/experimental/FunctionScheduler.h create mode 100644 folly/test/FunctionSchedulerTest.cpp diff --git a/folly/experimental/FunctionScheduler.cpp b/folly/experimental/FunctionScheduler.cpp new file mode 100644 index 00000000..24667fca --- /dev/null +++ b/folly/experimental/FunctionScheduler.cpp @@ -0,0 +1,296 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#ifdef _POSIX_MONOTONIC_CLOCK +#define FOLLY_TIME_MONOTONIC_CLOCK CLOCK_MONOTONIC +#else +#define FOLLY_TIME_MONOTONIC_CLOCK CLOCK_REALTIME +#endif + +using namespace std; +using std::chrono::seconds; +using std::chrono::milliseconds; + +static milliseconds nowInMS() { + struct timespec ts /*= void*/; + if (clock_gettime(FOLLY_TIME_MONOTONIC_CLOCK, &ts)) { + // Only possible failures are EFAULT or EINVAL, both practically + // impossible. But an assert can't hurt. + assert(false); + } + return milliseconds( + static_cast(ts.tv_sec * 1000.0 + ts.tv_nsec / 1000000.0 + 0.5)); +} + +namespace folly { + +FunctionScheduler::FunctionScheduler() { +} + +FunctionScheduler::~FunctionScheduler() { + // make sure to stop the thread (if running) + shutdown(); +} + +void FunctionScheduler::addFunction(const std::function& cb, + milliseconds interval, + StringPiece nameID, + milliseconds startDelay) { + LatencyDistribution latencyDistr(false, 0.0); + addFunctionInternal(cb, interval, + latencyDistr, nameID, startDelay); +} + +void FunctionScheduler::addFunctionInternal(const std::function& cb, + milliseconds interval, + const LatencyDistribution& latencyDistr, + StringPiece nameID, + milliseconds startDelay) { + if (interval < milliseconds::zero()) { + throw std::invalid_argument("FunctionScheduler: " + "time interval must be non-negative"); + } + if (startDelay < milliseconds::zero()) { + throw std::invalid_argument("FunctionScheduler: " + "start delay must be non-negative"); + } + + std::lock_guard l(mutex_); + // check if the nameID is unique + for (const auto& f : functions_) { + if (f.isValid() && f.name == nameID) { + throw std::invalid_argument(to( + "FunctionScheduler: a function named \"", nameID, + "\" already exists")); + } + } + if (currentFunction_ && currentFunction_->name == nameID) { + throw std::invalid_argument(to( + "FunctionScheduler: a function named \"", nameID, + "\" already exists")); + } + + functions_.emplace_back(cb, interval, nameID.str(), startDelay, + latencyDistr.isPoisson, latencyDistr.poissonMean); + if (running_) { + functions_.back().setNextRunTime(nowInMS() + startDelay); + std::push_heap(functions_.begin(), functions_.end(), fnCmp_); + // Signal the running thread to wake up and see if it needs to change it's + // current scheduling decision. + runningCondvar_.notify_one(); + } +} + +bool FunctionScheduler::cancelFunction(StringPiece nameID) { + bool retValue = false; + std::unique_lock l(mutex_); + + if (currentFunction_ && currentFunction_->name == nameID) { + // This function is currently being run. Clear currentFunction_ + // The running thread will see this and won't reschedule the function. + currentFunction_ = nullptr; + return true; + } + + for (auto it = functions_.begin(); it != functions_.end(); ++it) { + if (it->isValid() && it->name == nameID) { + cancelFunction(l, it); + return true; + } + } + return false; +} + +void FunctionScheduler::cancelFunction(const std::unique_lock& l, + FunctionHeap::iterator it) { + // This function should only be called with mutex_ already locked. + DCHECK(l.mutex() == &mutex_); + DCHECK(l.owns_lock()); + + if (running_) { + // Internally gcc has an __adjust_heap() function to fill in a hole in the + // heap. Unfortunately it isn't part of the standard API. + // + // For now we just leave the RepeatFunc in our heap, but mark it as unused. + // When it's nextTimeInterval comes up, the runner thread will pop it from + // the heap and simply throw it away. + it->cancel(); + } else { + // We're not running, so functions_ doesn't need to be maintained in heap + // order. + functions_.erase(it); + } +} + +void FunctionScheduler::cancelAllFunctions() { + std::unique_lock l(mutex_); + functions_.clear(); +} + +bool FunctionScheduler::start() { + std::unique_lock l(mutex_); + if (running_) { + return false; + } + + running_ = true; + + VLOG(1) << "Starting FunctionScheduler with " << functions_.size() + << " functions."; + milliseconds now(nowInMS()); + // Reset the next run time. for all functions. + // note: this is needed since one can shutdown() and start() again + for (auto& f : functions_) { + f.setNextRunTime(now + f.startDelay); + VLOG(1) << " - func: " + << (f.name.empty() ? "(anon)" : f.name.c_str()) + << ", period = " << f.timeInterval.count() + << "ms, delay = " << f.startDelay.count() << "ms"; + } + std::make_heap(functions_.begin(), functions_.end(), fnCmp_); + + thread_ = std::thread([&] { this->run(); }); + return true; +} + +void FunctionScheduler::shutdown() { + { + std::lock_guard g(mutex_); + if (!running_) { + return; + } + + running_ = false; + runningCondvar_.notify_one(); + } + thread_.join(); +} + +void FunctionScheduler::run() { + std::unique_lock lock(mutex_); + + if (!threadName_.empty()) { + folly::setThreadName(threadName_); + google::setThreadName(threadName_); + } + + while (running_) { + // If we have nothing to run, wait until a function is added or until we + // are stopped. + if (functions_.empty()) { + runningCondvar_.wait(lock); + continue; + } + + milliseconds now(nowInMS()); + + // Move the next function to run to the end of functions_ + std::pop_heap(functions_.begin(), functions_.end(), fnCmp_); + + // Check to see if the function was cancelled. + // If so, just remove it and continue around the loop. + if (!functions_.back().isValid()) { + functions_.pop_back(); + continue; + } + + auto sleepTime = functions_.back().getNextRunTime() - now; + if (sleepTime < milliseconds::zero()) { + // We need to run this function now + runOneFunction(lock, now); + } else { + // Re-add the function to the heap, and wait until we actually + // need to run it. + std::push_heap(functions_.begin(), functions_.end(), fnCmp_); + runningCondvar_.wait_for(lock, sleepTime); + } + } +} + +void FunctionScheduler::runOneFunction(std::unique_lock& lock, + std::chrono::milliseconds now) { + DCHECK(lock.mutex() == &mutex_); + DCHECK(lock.owns_lock()); + + // The function to run will be at the end of functions_ already. + // + // Fully remove it from functions_ now. + // We need to release mutex_ while we invoke this function, and we need to + // maintain the heap property on functions_ while mutex_ is unlocked. + RepeatFunc func(std::move(functions_.back())); + functions_.pop_back(); + currentFunction_ = &func; + + // Update the function's run time, and re-insert it into the heap. + if (steady_) { + // This allows scheduler to catch up + func.lastRunTime += func.timeInterval; + } else { + // Note that we adjust lastRunTime to the current time where we started the + // function call, rather than the time when the function finishes. + // This ensures that we call the function once every time interval, as + // opposed to waiting time interval seconds between calls. (These can be + // different if the function takes a significant amount of time to run.) + func.lastRunTime = now; + } + + // Release the lock while we invoke the user's function + lock.unlock(); + + // Invoke the function + try { + VLOG(5) << "Now running " << func.name; + func.cb(); + } catch (const std::exception& ex) { + LOG(ERROR) << "Error running the scheduled function <" + << func.name << ">: " << exceptionStr(ex); + } + + // Re-acquire the lock + lock.lock(); + + if (!currentFunction_) { + // The function was cancelled while we were running it. + // We shouldn't reschedule it; + return; + } + // Clear currentFunction_ + CHECK_EQ(currentFunction_, &func); + currentFunction_ = nullptr; + + // Re-insert the function into our functions_ heap. + // We only maintain the heap property while running_ is set. (running_ may + // have been cleared while we were invoking the user's function.) + if (func.isPoissonDistr) { + func.setTimeIntervalPoissonDistr(); + } + functions_.push_back(std::move(func)); + if (running_) { + std::push_heap(functions_.begin(), functions_.end(), fnCmp_); + } +} + +void FunctionScheduler::setThreadName(StringPiece threadName) { + std::unique_lock l(mutex_); + threadName_ = threadName.str(); +} + +} diff --git a/folly/experimental/FunctionScheduler.h b/folly/experimental/FunctionScheduler.h new file mode 100644 index 00000000..08553fe5 --- /dev/null +++ b/folly/experimental/FunctionScheduler.h @@ -0,0 +1,217 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FOLLY_EXPERIMENTAL_FUNCTION_SCHEDULER_H_ +#define FOLLY_EXPERIMENTAL_FUNCTION_SCHEDULER_H_ + +#include +#include +#include +#include +#include +#include +#include + +namespace folly { + +/** + * Schedules any number of functions to run at various intervals. E.g., + * + * FunctionScheduler fs; + * + * fs.addFunction([&] { LOG(INFO) << "tick..."; }, seconds(1), "ticker"); + * fs.addFunction(std::bind(&TestClass::doStuff, this), minutes(5), "stuff"); + * fs.start(); + * ........ + * fs.cancelFunction("ticker"); + * fs.addFunction([&] { LOG(INFO) << "tock..."; }, minutes(3), "tocker"); + * ........ + * fs.shutdown(); + * + * + * Note: the class uses only one thread - if you want to use more than one + * thread use multiple FunctionScheduler objects + * + * start() schedules the functions, while shutdown() terminates further + * scheduling. + */ +class FunctionScheduler { + public: + FunctionScheduler(); + ~FunctionScheduler(); + + /** + * By default steady is false, meaning schedules may lag behind overtime. + * This could be due to long running tasks or time drift because of randomness + * in thread wakeup time. + * By setting steady to true, FunctionScheduler will attempt to catch up. + * i.e. more like a cronjob + * + * NOTE: it's only safe to set this before calling start() + */ + void setSteady(bool steady) { steady_ = steady; } + + struct LatencyDistribution { + bool isPoisson; + double poissonMean; + + LatencyDistribution(bool poisson, + double mean) + : isPoisson(poisson), + poissonMean(mean) { + } + + }; + + /** + * Adds a new function to the FunctionScheduler. + * + * Functions will not be run until start() is called. When start() is + * called, each function will be run after its specified startDelay. + * Functions may also be added after start() has been called, in which case + * startDelay is still honored. + * + * Throws an exception on error. In particular, each function must have a + * unique name--two functions cannot be added with the same name. + */ + void addFunction(const std::function& cb, + std::chrono::milliseconds interval, + StringPiece nameID = StringPiece(), + std::chrono::milliseconds startDelay = + std::chrono::milliseconds(0)); + + /** + * Cancels the function with the specified name, so it will no longer be run. + * + * Returns false if no function exists with the specified name. + */ + bool cancelFunction(StringPiece nameID); + + /** + * All functions registered will be canceled. + */ + void cancelAllFunctions(); + + /** + * Starts the scheduler. + * + * Returns false if the scheduler was already running. + */ + bool start(); + + /** + * Stops the FunctionScheduler. + * + * It may be restarted later by calling start() again. + */ + void shutdown(); + + /** + * Set the name of the worker thread. + */ + void setThreadName(StringPiece threadName); + + + private: + void addFunctionInternal(const std::function& cb, + std::chrono::milliseconds interval, + const LatencyDistribution& latencyDistr, + StringPiece nameID = StringPiece(), + std::chrono::milliseconds startDelay = + std::chrono::milliseconds(0)); + struct RepeatFunc { + std::function cb; + std::chrono::milliseconds timeInterval; + std::chrono::milliseconds lastRunTime; + std::string name; + std::chrono::milliseconds startDelay; + bool isPoissonDistr; + std::default_random_engine generator; + std::poisson_distribution poisson_random; + + RepeatFunc(const std::function& cback, + std::chrono::milliseconds interval, + const std::string& nameID, + std::chrono::milliseconds delay, + bool poisson = false, + double meanPoisson = 1.0) + : cb(cback), + timeInterval(interval), + lastRunTime(0), + name(nameID), + startDelay(delay), + isPoissonDistr(poisson), + poisson_random(meanPoisson) { + } + + std::chrono::milliseconds getNextRunTime() const { + return lastRunTime + timeInterval; + } + void setNextRunTime(std::chrono::milliseconds time) { + lastRunTime = time - timeInterval; + } + void setTimeIntervalPoissonDistr() { + if (isPoissonDistr) { + timeInterval = std::chrono::milliseconds(poisson_random(generator)); + } + } + void cancel() { + // Simply reset cb to an empty function. + cb = std::function(); + } + bool isValid() const { + return bool(cb); + } + }; + struct RunTimeOrder { + bool operator()(const RepeatFunc& f1, const RepeatFunc& f2) const { + return f1.getNextRunTime() > f2.getNextRunTime(); + } + }; + typedef std::vector FunctionHeap; + + void run(); + void runOneFunction(std::unique_lock& lock, + std::chrono::milliseconds now); + void cancelFunction(const std::unique_lock &lock, + FunctionHeap::iterator it); + + std::thread thread_; + + // Mutex to protect our member variables. + std::mutex mutex_; + bool running_{false}; + + // The functions to run. + // This is a heap, ordered by next run time. + FunctionHeap functions_; + RunTimeOrder fnCmp_; + + // The function currently being invoked by the running thread. + // This is null when the running thread is idle + RepeatFunc* currentFunction_{nullptr}; + + // Condition variable that is signalled whenever a new function is added + // or when the FunctionScheduler is stopped. + std::condition_variable runningCondvar_; + + std::string threadName_; + bool steady_{false}; +}; + +} + +#endif diff --git a/folly/test/FunctionSchedulerTest.cpp b/folly/test/FunctionSchedulerTest.cpp new file mode 100644 index 00000000..de1d2741 --- /dev/null +++ b/folly/test/FunctionSchedulerTest.cpp @@ -0,0 +1,321 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include + +#include + +namespace folly { + +/* + * Helper functions for controlling how long this test takes. + * + * Using larger intervals here will make the tests less flaky when run on + * heavily loaded systems. However, this will also make the tests take longer + * to run. + */ +static const auto timeFactor = std::chrono::milliseconds(100); +std::chrono::milliseconds testInterval(int n) { + return n * timeFactor; +} +void delay(int n) { + std::chrono::microseconds usec(n * timeFactor); + usleep(usec.count()); +} + +TEST(FunctionScheduler, SimpleAdd) { + int total = 0; + FunctionScheduler fs; + fs.addFunction([&] { total += 2; }, testInterval(2), "add2"); + fs.start(); + delay(1); + EXPECT_EQ(2, total); + fs.shutdown(); + delay(2); + EXPECT_EQ(2, total); +} + +TEST(FunctionScheduler, AddCancel) { + int total = 0; + FunctionScheduler fs; + fs.addFunction([&] { total += 2; }, testInterval(2), "add2"); + fs.start(); + delay(1); + EXPECT_EQ(2, total); + delay(2); + EXPECT_EQ(4, total); + EXPECT_TRUE(fs.cancelFunction("add2")); + EXPECT_FALSE(fs.cancelFunction("NO SUCH FUNC")); + delay(2); + EXPECT_EQ(4, total); + fs.addFunction([&] { total += 1; }, testInterval(2), "add2"); + EXPECT_FALSE(fs.start()); // already running + delay(1); + EXPECT_EQ(5, total); + delay(2); + EXPECT_EQ(6, total); + fs.shutdown(); +} + +TEST(FunctionScheduler, AddCancel2) { + int total = 0; + FunctionScheduler fs; + + // Test adds and cancels while the scheduler is stopped + EXPECT_FALSE(fs.cancelFunction("add2")); + fs.addFunction([&] { total += 1; }, testInterval(2), "add2"); + EXPECT_TRUE(fs.cancelFunction("add2")); + EXPECT_FALSE(fs.cancelFunction("add2")); + fs.addFunction([&] { total += 2; }, testInterval(2), "add2"); + fs.addFunction([&] { total += 3; }, testInterval(3), "add3"); + + EXPECT_EQ(0, total); + fs.start(); + delay(1); + EXPECT_EQ(5, total); + + // Cancel add2 while the scheduler is running + EXPECT_TRUE(fs.cancelFunction("add2")); + EXPECT_FALSE(fs.cancelFunction("add2")); + EXPECT_FALSE(fs.cancelFunction("bogus")); + + delay(3); + EXPECT_EQ(8, total); + EXPECT_TRUE(fs.cancelFunction("add3")); + + // Test a function that cancels itself + int selfCancelCount = 0; + fs.addFunction( + [&] { + ++selfCancelCount; + if (selfCancelCount > 2) { + fs.cancelFunction("selfCancel"); + } + }, + testInterval(1), "selfCancel", testInterval(1)); + delay(4); + EXPECT_EQ(3, selfCancelCount); + EXPECT_FALSE(fs.cancelFunction("selfCancel")); + + // Test a function that schedules another function + int adderCount = 0; + int fn2Count = 0; + auto fn2 = [&] { ++fn2Count; }; + auto fnAdder = [&] { + ++adderCount; + if (adderCount == 2) { + fs.addFunction(fn2, testInterval(3), "fn2", testInterval(2)); + } + }; + fs.addFunction(fnAdder, testInterval(4), "adder"); + // t0: adder fires + delay(1); // t1 + EXPECT_EQ(1, adderCount); + EXPECT_EQ(0, fn2Count); + // t4: adder fires, schedules fn2 + delay(4); // t5 + EXPECT_EQ(2, adderCount); + EXPECT_EQ(0, fn2Count); + // t6: fn2 fires + delay(2); // t7 + EXPECT_EQ(2, adderCount); + EXPECT_EQ(1, fn2Count); + // t8: adder fires + // t9: fn2 fires + delay(3); // t10 + EXPECT_EQ(3, adderCount); + EXPECT_EQ(2, fn2Count); + EXPECT_TRUE(fs.cancelFunction("fn2")); + EXPECT_TRUE(fs.cancelFunction("adder")); + delay(5); // t10 + EXPECT_EQ(3, adderCount); + EXPECT_EQ(2, fn2Count); + + EXPECT_EQ(8, total); + EXPECT_EQ(3, selfCancelCount); +} + +TEST(FunctionScheduler, AddMultiple) { + int total = 0; + FunctionScheduler fs; + fs.addFunction([&] { total += 2; }, testInterval(2), "add2"); + fs.addFunction([&] { total += 3; }, testInterval(3), "add3"); +// function name already exists + EXPECT_THROW(fs.addFunction([&] { total += 2; }, testInterval(2), + "add2"), std::exception); + + fs.start(); + delay(1); + EXPECT_EQ(5, total); + delay(4); + EXPECT_EQ(12, total); + EXPECT_TRUE(fs.cancelFunction("add2")); + delay(2); + EXPECT_EQ(15, total); + fs.shutdown(); + delay(3); + EXPECT_EQ(15, total); + fs.shutdown(); +} + +TEST(FunctionScheduler, AddAfterStart) { + int total = 0; + FunctionScheduler fs; + fs.addFunction([&] { total += 2; }, testInterval(2), "add2"); + fs.addFunction([&] { total += 3; }, testInterval(2), "add3"); + fs.start(); + delay(3); + EXPECT_EQ(10, total); + fs.addFunction([&] { total += 2; }, testInterval(3), "add22"); + delay(2); + EXPECT_EQ(17, total); +} + +TEST(FunctionScheduler, ShutdownStart) { + int total = 0; + FunctionScheduler fs; + fs.addFunction([&] { total += 2; }, testInterval(2), "add2"); + fs.start(); + delay(1); + fs.shutdown(); + fs.start(); + delay(1); + EXPECT_EQ(4, total); + EXPECT_FALSE(fs.cancelFunction("add3")); // non existing + delay(2); + EXPECT_EQ(6, total); +} + +TEST(FunctionScheduler, AddInvalid) { + int total = 0; + FunctionScheduler fs; + // interval may not be negative + EXPECT_THROW(fs.addFunction([&] { total += 2; }, testInterval(-1), "add2"), + std::exception); + EXPECT_FALSE(fs.cancelFunction("addNoFunc")); +} + +TEST(FunctionScheduler, NoFunctions) { + FunctionScheduler fs; + EXPECT_TRUE(fs.start()); + fs.shutdown(); + FunctionScheduler fs2; + fs2.shutdown(); +} + +TEST(FunctionScheduler, AddWhileRunning) { + int total = 0; + FunctionScheduler fs; + fs.start(); + delay(1); + fs.addFunction([&] { total += 2; }, testInterval(2), "add2"); + // The function should be invoked nearly immediately when we add it + // and the FunctionScheduler is already running + usleep(50000); + EXPECT_EQ(2, total); + delay(2); + EXPECT_EQ(4, total); +} + +TEST(FunctionScheduler, NoShutdown) { + int total = 0; + { + FunctionScheduler fs; + fs.addFunction([&] { total += 2; }, testInterval(1), "add2"); + fs.start(); + usleep(50000); + EXPECT_EQ(2, total); + } + // Destroyed the FunctionScheduler without calling shutdown. + // Everything should have been cleaned up, and the function will no longer + // get called. + delay(2); + EXPECT_EQ(2, total); +} + +TEST(FunctionScheduler, StartDelay) { + int total = 0; + FunctionScheduler fs; + fs.addFunction([&] { total += 2; }, testInterval(2), "add2", + testInterval(2)); + fs.addFunction([&] { total += 3; }, testInterval(3), "add3", + testInterval(2)); + EXPECT_THROW(fs.addFunction([&] { total += 2; }, testInterval(3), + "addX", testInterval(-1)), std::exception); + fs.start(); + delay(1); // t1 + EXPECT_EQ(0, total); + // t2 : add2 total=2 + // t2 : add3 total=5 + delay(2); // t3 + EXPECT_EQ(5, total); + // t4 : add2: total=7 + // t5 : add3: total=10 + // t6 : add2: total=12 + delay(4); // t7 + EXPECT_EQ(12, total); + fs.cancelFunction("add2"); + // t8 : add3: total=15 + delay(2); // t9 + EXPECT_EQ(15, total); + fs.shutdown(); + delay(3); + EXPECT_EQ(15, total); + fs.shutdown(); +} + +TEST(FunctionScheduler, NoSteadyCatchup) { + std::atomic ticks(0); + FunctionScheduler fs; + fs.setThreadName("NoSteadyCatchup"); + // fs.setSteady(false); is the default + fs.addFunction([&ticks] { + if (++ticks == 2) { + std::this_thread::sleep_for( + std::chrono::milliseconds(200)); + } + }, + std::chrono::milliseconds(5)); + fs.start(); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + // no steady catch up means we'd tick once for 200ms, then remaining + // 300ms / 5 = 60 times + EXPECT_LE(ticks.load(), 61); +} + +TEST(FunctionScheduler, SteadyCatchup) { + std::atomic ticks(0); + FunctionScheduler fs; + fs.setThreadName("SteadyCatchup"); + fs.setSteady(true); + fs.addFunction([&ticks] { + if (++ticks == 2) { + std::this_thread::sleep_for( + std::chrono::milliseconds(200)); + } + }, + std::chrono::milliseconds(5)); + fs.start(); + + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + // tick every 5ms. Despite tick == 2 is slow, later ticks should be fast + // enough to catch back up to schedule + EXPECT_NEAR(100, ticks.load(), 10); +} + +} -- 2.34.1