--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/experimental/FunctionScheduler.h>
+#include <folly/ThreadName.h>
+#include <folly/Conv.h>
+#include <folly/String.h>
+
+#ifdef _POSIX_MONOTONIC_CLOCK
+#define FOLLY_TIME_MONOTONIC_CLOCK CLOCK_MONOTONIC
+#else
+#define FOLLY_TIME_MONOTONIC_CLOCK CLOCK_REALTIME
+#endif
+
+using namespace std;
+using std::chrono::seconds;
+using std::chrono::milliseconds;
+
+static milliseconds nowInMS() {
+ struct timespec ts /*= void*/;
+ if (clock_gettime(FOLLY_TIME_MONOTONIC_CLOCK, &ts)) {
+ // Only possible failures are EFAULT or EINVAL, both practically
+ // impossible. But an assert can't hurt.
+ assert(false);
+ }
+ return milliseconds(
+ static_cast<int64_t>(ts.tv_sec * 1000.0 + ts.tv_nsec / 1000000.0 + 0.5));
+}
+
+namespace folly {
+
+FunctionScheduler::FunctionScheduler() {
+}
+
+FunctionScheduler::~FunctionScheduler() {
+ // make sure to stop the thread (if running)
+ shutdown();
+}
+
+void FunctionScheduler::addFunction(const std::function<void()>& cb,
+ milliseconds interval,
+ StringPiece nameID,
+ milliseconds startDelay) {
+ LatencyDistribution latencyDistr(false, 0.0);
+ addFunctionInternal(cb, interval,
+ latencyDistr, nameID, startDelay);
+}
+
+void FunctionScheduler::addFunctionInternal(const std::function<void()>& cb,
+ milliseconds interval,
+ const LatencyDistribution& latencyDistr,
+ StringPiece nameID,
+ milliseconds startDelay) {
+ if (interval < milliseconds::zero()) {
+ throw std::invalid_argument("FunctionScheduler: "
+ "time interval must be non-negative");
+ }
+ if (startDelay < milliseconds::zero()) {
+ throw std::invalid_argument("FunctionScheduler: "
+ "start delay must be non-negative");
+ }
+
+ std::lock_guard<std::mutex> l(mutex_);
+ // check if the nameID is unique
+ for (const auto& f : functions_) {
+ if (f.isValid() && f.name == nameID) {
+ throw std::invalid_argument(to<string>(
+ "FunctionScheduler: a function named \"", nameID,
+ "\" already exists"));
+ }
+ }
+ if (currentFunction_ && currentFunction_->name == nameID) {
+ throw std::invalid_argument(to<string>(
+ "FunctionScheduler: a function named \"", nameID,
+ "\" already exists"));
+ }
+
+ functions_.emplace_back(cb, interval, nameID.str(), startDelay,
+ latencyDistr.isPoisson, latencyDistr.poissonMean);
+ if (running_) {
+ functions_.back().setNextRunTime(nowInMS() + startDelay);
+ std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
+ // Signal the running thread to wake up and see if it needs to change it's
+ // current scheduling decision.
+ runningCondvar_.notify_one();
+ }
+}
+
+bool FunctionScheduler::cancelFunction(StringPiece nameID) {
+ bool retValue = false;
+ std::unique_lock<std::mutex> l(mutex_);
+
+ if (currentFunction_ && currentFunction_->name == nameID) {
+ // This function is currently being run. Clear currentFunction_
+ // The running thread will see this and won't reschedule the function.
+ currentFunction_ = nullptr;
+ return true;
+ }
+
+ for (auto it = functions_.begin(); it != functions_.end(); ++it) {
+ if (it->isValid() && it->name == nameID) {
+ cancelFunction(l, it);
+ return true;
+ }
+ }
+ return false;
+}
+
+void FunctionScheduler::cancelFunction(const std::unique_lock<std::mutex>& l,
+ FunctionHeap::iterator it) {
+ // This function should only be called with mutex_ already locked.
+ DCHECK(l.mutex() == &mutex_);
+ DCHECK(l.owns_lock());
+
+ if (running_) {
+ // Internally gcc has an __adjust_heap() function to fill in a hole in the
+ // heap. Unfortunately it isn't part of the standard API.
+ //
+ // For now we just leave the RepeatFunc in our heap, but mark it as unused.
+ // When it's nextTimeInterval comes up, the runner thread will pop it from
+ // the heap and simply throw it away.
+ it->cancel();
+ } else {
+ // We're not running, so functions_ doesn't need to be maintained in heap
+ // order.
+ functions_.erase(it);
+ }
+}
+
+void FunctionScheduler::cancelAllFunctions() {
+ std::unique_lock<std::mutex> l(mutex_);
+ functions_.clear();
+}
+
+bool FunctionScheduler::start() {
+ std::unique_lock<std::mutex> l(mutex_);
+ if (running_) {
+ return false;
+ }
+
+ running_ = true;
+
+ VLOG(1) << "Starting FunctionScheduler with " << functions_.size()
+ << " functions.";
+ milliseconds now(nowInMS());
+ // Reset the next run time. for all functions.
+ // note: this is needed since one can shutdown() and start() again
+ for (auto& f : functions_) {
+ f.setNextRunTime(now + f.startDelay);
+ VLOG(1) << " - func: "
+ << (f.name.empty() ? "(anon)" : f.name.c_str())
+ << ", period = " << f.timeInterval.count()
+ << "ms, delay = " << f.startDelay.count() << "ms";
+ }
+ std::make_heap(functions_.begin(), functions_.end(), fnCmp_);
+
+ thread_ = std::thread([&] { this->run(); });
+ return true;
+}
+
+void FunctionScheduler::shutdown() {
+ {
+ std::lock_guard<std::mutex> g(mutex_);
+ if (!running_) {
+ return;
+ }
+
+ running_ = false;
+ runningCondvar_.notify_one();
+ }
+ thread_.join();
+}
+
+void FunctionScheduler::run() {
+ std::unique_lock<std::mutex> lock(mutex_);
+
+ if (!threadName_.empty()) {
+ folly::setThreadName(threadName_);
+ google::setThreadName(threadName_);
+ }
+
+ while (running_) {
+ // If we have nothing to run, wait until a function is added or until we
+ // are stopped.
+ if (functions_.empty()) {
+ runningCondvar_.wait(lock);
+ continue;
+ }
+
+ milliseconds now(nowInMS());
+
+ // Move the next function to run to the end of functions_
+ std::pop_heap(functions_.begin(), functions_.end(), fnCmp_);
+
+ // Check to see if the function was cancelled.
+ // If so, just remove it and continue around the loop.
+ if (!functions_.back().isValid()) {
+ functions_.pop_back();
+ continue;
+ }
+
+ auto sleepTime = functions_.back().getNextRunTime() - now;
+ if (sleepTime < milliseconds::zero()) {
+ // We need to run this function now
+ runOneFunction(lock, now);
+ } else {
+ // Re-add the function to the heap, and wait until we actually
+ // need to run it.
+ std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
+ runningCondvar_.wait_for(lock, sleepTime);
+ }
+ }
+}
+
+void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock,
+ std::chrono::milliseconds now) {
+ DCHECK(lock.mutex() == &mutex_);
+ DCHECK(lock.owns_lock());
+
+ // The function to run will be at the end of functions_ already.
+ //
+ // Fully remove it from functions_ now.
+ // We need to release mutex_ while we invoke this function, and we need to
+ // maintain the heap property on functions_ while mutex_ is unlocked.
+ RepeatFunc func(std::move(functions_.back()));
+ functions_.pop_back();
+ currentFunction_ = &func;
+
+ // Update the function's run time, and re-insert it into the heap.
+ if (steady_) {
+ // This allows scheduler to catch up
+ func.lastRunTime += func.timeInterval;
+ } else {
+ // Note that we adjust lastRunTime to the current time where we started the
+ // function call, rather than the time when the function finishes.
+ // This ensures that we call the function once every time interval, as
+ // opposed to waiting time interval seconds between calls. (These can be
+ // different if the function takes a significant amount of time to run.)
+ func.lastRunTime = now;
+ }
+
+ // Release the lock while we invoke the user's function
+ lock.unlock();
+
+ // Invoke the function
+ try {
+ VLOG(5) << "Now running " << func.name;
+ func.cb();
+ } catch (const std::exception& ex) {
+ LOG(ERROR) << "Error running the scheduled function <"
+ << func.name << ">: " << exceptionStr(ex);
+ }
+
+ // Re-acquire the lock
+ lock.lock();
+
+ if (!currentFunction_) {
+ // The function was cancelled while we were running it.
+ // We shouldn't reschedule it;
+ return;
+ }
+ // Clear currentFunction_
+ CHECK_EQ(currentFunction_, &func);
+ currentFunction_ = nullptr;
+
+ // Re-insert the function into our functions_ heap.
+ // We only maintain the heap property while running_ is set. (running_ may
+ // have been cleared while we were invoking the user's function.)
+ if (func.isPoissonDistr) {
+ func.setTimeIntervalPoissonDistr();
+ }
+ functions_.push_back(std::move(func));
+ if (running_) {
+ std::push_heap(functions_.begin(), functions_.end(), fnCmp_);
+ }
+}
+
+void FunctionScheduler::setThreadName(StringPiece threadName) {
+ std::unique_lock<std::mutex> l(mutex_);
+ threadName_ = threadName.str();
+}
+
+}
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FOLLY_EXPERIMENTAL_FUNCTION_SCHEDULER_H_
+#define FOLLY_EXPERIMENTAL_FUNCTION_SCHEDULER_H_
+
+#include <folly/Range.h>
+#include <chrono>
+#include <condition_variable>
+#include <mutex>
+#include <thread>
+#include <vector>
+#include <random>
+
+namespace folly {
+
+/**
+ * Schedules any number of functions to run at various intervals. E.g.,
+ *
+ * FunctionScheduler fs;
+ *
+ * fs.addFunction([&] { LOG(INFO) << "tick..."; }, seconds(1), "ticker");
+ * fs.addFunction(std::bind(&TestClass::doStuff, this), minutes(5), "stuff");
+ * fs.start();
+ * ........
+ * fs.cancelFunction("ticker");
+ * fs.addFunction([&] { LOG(INFO) << "tock..."; }, minutes(3), "tocker");
+ * ........
+ * fs.shutdown();
+ *
+ *
+ * Note: the class uses only one thread - if you want to use more than one
+ * thread use multiple FunctionScheduler objects
+ *
+ * start() schedules the functions, while shutdown() terminates further
+ * scheduling.
+ */
+class FunctionScheduler {
+ public:
+ FunctionScheduler();
+ ~FunctionScheduler();
+
+ /**
+ * By default steady is false, meaning schedules may lag behind overtime.
+ * This could be due to long running tasks or time drift because of randomness
+ * in thread wakeup time.
+ * By setting steady to true, FunctionScheduler will attempt to catch up.
+ * i.e. more like a cronjob
+ *
+ * NOTE: it's only safe to set this before calling start()
+ */
+ void setSteady(bool steady) { steady_ = steady; }
+
+ struct LatencyDistribution {
+ bool isPoisson;
+ double poissonMean;
+
+ LatencyDistribution(bool poisson,
+ double mean)
+ : isPoisson(poisson),
+ poissonMean(mean) {
+ }
+
+ };
+
+ /**
+ * Adds a new function to the FunctionScheduler.
+ *
+ * Functions will not be run until start() is called. When start() is
+ * called, each function will be run after its specified startDelay.
+ * Functions may also be added after start() has been called, in which case
+ * startDelay is still honored.
+ *
+ * Throws an exception on error. In particular, each function must have a
+ * unique name--two functions cannot be added with the same name.
+ */
+ void addFunction(const std::function<void()>& cb,
+ std::chrono::milliseconds interval,
+ StringPiece nameID = StringPiece(),
+ std::chrono::milliseconds startDelay =
+ std::chrono::milliseconds(0));
+
+ /**
+ * Cancels the function with the specified name, so it will no longer be run.
+ *
+ * Returns false if no function exists with the specified name.
+ */
+ bool cancelFunction(StringPiece nameID);
+
+ /**
+ * All functions registered will be canceled.
+ */
+ void cancelAllFunctions();
+
+ /**
+ * Starts the scheduler.
+ *
+ * Returns false if the scheduler was already running.
+ */
+ bool start();
+
+ /**
+ * Stops the FunctionScheduler.
+ *
+ * It may be restarted later by calling start() again.
+ */
+ void shutdown();
+
+ /**
+ * Set the name of the worker thread.
+ */
+ void setThreadName(StringPiece threadName);
+
+
+ private:
+ void addFunctionInternal(const std::function<void()>& cb,
+ std::chrono::milliseconds interval,
+ const LatencyDistribution& latencyDistr,
+ StringPiece nameID = StringPiece(),
+ std::chrono::milliseconds startDelay =
+ std::chrono::milliseconds(0));
+ struct RepeatFunc {
+ std::function<void()> cb;
+ std::chrono::milliseconds timeInterval;
+ std::chrono::milliseconds lastRunTime;
+ std::string name;
+ std::chrono::milliseconds startDelay;
+ bool isPoissonDistr;
+ std::default_random_engine generator;
+ std::poisson_distribution<int> poisson_random;
+
+ RepeatFunc(const std::function<void()>& cback,
+ std::chrono::milliseconds interval,
+ const std::string& nameID,
+ std::chrono::milliseconds delay,
+ bool poisson = false,
+ double meanPoisson = 1.0)
+ : cb(cback),
+ timeInterval(interval),
+ lastRunTime(0),
+ name(nameID),
+ startDelay(delay),
+ isPoissonDistr(poisson),
+ poisson_random(meanPoisson) {
+ }
+
+ std::chrono::milliseconds getNextRunTime() const {
+ return lastRunTime + timeInterval;
+ }
+ void setNextRunTime(std::chrono::milliseconds time) {
+ lastRunTime = time - timeInterval;
+ }
+ void setTimeIntervalPoissonDistr() {
+ if (isPoissonDistr) {
+ timeInterval = std::chrono::milliseconds(poisson_random(generator));
+ }
+ }
+ void cancel() {
+ // Simply reset cb to an empty function.
+ cb = std::function<void()>();
+ }
+ bool isValid() const {
+ return bool(cb);
+ }
+ };
+ struct RunTimeOrder {
+ bool operator()(const RepeatFunc& f1, const RepeatFunc& f2) const {
+ return f1.getNextRunTime() > f2.getNextRunTime();
+ }
+ };
+ typedef std::vector<RepeatFunc> FunctionHeap;
+
+ void run();
+ void runOneFunction(std::unique_lock<std::mutex>& lock,
+ std::chrono::milliseconds now);
+ void cancelFunction(const std::unique_lock<std::mutex> &lock,
+ FunctionHeap::iterator it);
+
+ std::thread thread_;
+
+ // Mutex to protect our member variables.
+ std::mutex mutex_;
+ bool running_{false};
+
+ // The functions to run.
+ // This is a heap, ordered by next run time.
+ FunctionHeap functions_;
+ RunTimeOrder fnCmp_;
+
+ // The function currently being invoked by the running thread.
+ // This is null when the running thread is idle
+ RepeatFunc* currentFunction_{nullptr};
+
+ // Condition variable that is signalled whenever a new function is added
+ // or when the FunctionScheduler is stopped.
+ std::condition_variable runningCondvar_;
+
+ std::string threadName_;
+ bool steady_{false};
+};
+
+}
+
+#endif
--- /dev/null
+/*
+ * Copyright 2015 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <atomic>
+#include <gtest/gtest.h>
+
+#include <folly/experimental/FunctionScheduler.h>
+
+namespace folly {
+
+/*
+ * Helper functions for controlling how long this test takes.
+ *
+ * Using larger intervals here will make the tests less flaky when run on
+ * heavily loaded systems. However, this will also make the tests take longer
+ * to run.
+ */
+static const auto timeFactor = std::chrono::milliseconds(100);
+std::chrono::milliseconds testInterval(int n) {
+ return n * timeFactor;
+}
+void delay(int n) {
+ std::chrono::microseconds usec(n * timeFactor);
+ usleep(usec.count());
+}
+
+TEST(FunctionScheduler, SimpleAdd) {
+ int total = 0;
+ FunctionScheduler fs;
+ fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
+ fs.start();
+ delay(1);
+ EXPECT_EQ(2, total);
+ fs.shutdown();
+ delay(2);
+ EXPECT_EQ(2, total);
+}
+
+TEST(FunctionScheduler, AddCancel) {
+ int total = 0;
+ FunctionScheduler fs;
+ fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
+ fs.start();
+ delay(1);
+ EXPECT_EQ(2, total);
+ delay(2);
+ EXPECT_EQ(4, total);
+ EXPECT_TRUE(fs.cancelFunction("add2"));
+ EXPECT_FALSE(fs.cancelFunction("NO SUCH FUNC"));
+ delay(2);
+ EXPECT_EQ(4, total);
+ fs.addFunction([&] { total += 1; }, testInterval(2), "add2");
+ EXPECT_FALSE(fs.start()); // already running
+ delay(1);
+ EXPECT_EQ(5, total);
+ delay(2);
+ EXPECT_EQ(6, total);
+ fs.shutdown();
+}
+
+TEST(FunctionScheduler, AddCancel2) {
+ int total = 0;
+ FunctionScheduler fs;
+
+ // Test adds and cancels while the scheduler is stopped
+ EXPECT_FALSE(fs.cancelFunction("add2"));
+ fs.addFunction([&] { total += 1; }, testInterval(2), "add2");
+ EXPECT_TRUE(fs.cancelFunction("add2"));
+ EXPECT_FALSE(fs.cancelFunction("add2"));
+ fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
+ fs.addFunction([&] { total += 3; }, testInterval(3), "add3");
+
+ EXPECT_EQ(0, total);
+ fs.start();
+ delay(1);
+ EXPECT_EQ(5, total);
+
+ // Cancel add2 while the scheduler is running
+ EXPECT_TRUE(fs.cancelFunction("add2"));
+ EXPECT_FALSE(fs.cancelFunction("add2"));
+ EXPECT_FALSE(fs.cancelFunction("bogus"));
+
+ delay(3);
+ EXPECT_EQ(8, total);
+ EXPECT_TRUE(fs.cancelFunction("add3"));
+
+ // Test a function that cancels itself
+ int selfCancelCount = 0;
+ fs.addFunction(
+ [&] {
+ ++selfCancelCount;
+ if (selfCancelCount > 2) {
+ fs.cancelFunction("selfCancel");
+ }
+ },
+ testInterval(1), "selfCancel", testInterval(1));
+ delay(4);
+ EXPECT_EQ(3, selfCancelCount);
+ EXPECT_FALSE(fs.cancelFunction("selfCancel"));
+
+ // Test a function that schedules another function
+ int adderCount = 0;
+ int fn2Count = 0;
+ auto fn2 = [&] { ++fn2Count; };
+ auto fnAdder = [&] {
+ ++adderCount;
+ if (adderCount == 2) {
+ fs.addFunction(fn2, testInterval(3), "fn2", testInterval(2));
+ }
+ };
+ fs.addFunction(fnAdder, testInterval(4), "adder");
+ // t0: adder fires
+ delay(1); // t1
+ EXPECT_EQ(1, adderCount);
+ EXPECT_EQ(0, fn2Count);
+ // t4: adder fires, schedules fn2
+ delay(4); // t5
+ EXPECT_EQ(2, adderCount);
+ EXPECT_EQ(0, fn2Count);
+ // t6: fn2 fires
+ delay(2); // t7
+ EXPECT_EQ(2, adderCount);
+ EXPECT_EQ(1, fn2Count);
+ // t8: adder fires
+ // t9: fn2 fires
+ delay(3); // t10
+ EXPECT_EQ(3, adderCount);
+ EXPECT_EQ(2, fn2Count);
+ EXPECT_TRUE(fs.cancelFunction("fn2"));
+ EXPECT_TRUE(fs.cancelFunction("adder"));
+ delay(5); // t10
+ EXPECT_EQ(3, adderCount);
+ EXPECT_EQ(2, fn2Count);
+
+ EXPECT_EQ(8, total);
+ EXPECT_EQ(3, selfCancelCount);
+}
+
+TEST(FunctionScheduler, AddMultiple) {
+ int total = 0;
+ FunctionScheduler fs;
+ fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
+ fs.addFunction([&] { total += 3; }, testInterval(3), "add3");
+// function name already exists
+ EXPECT_THROW(fs.addFunction([&] { total += 2; }, testInterval(2),
+ "add2"), std::exception);
+
+ fs.start();
+ delay(1);
+ EXPECT_EQ(5, total);
+ delay(4);
+ EXPECT_EQ(12, total);
+ EXPECT_TRUE(fs.cancelFunction("add2"));
+ delay(2);
+ EXPECT_EQ(15, total);
+ fs.shutdown();
+ delay(3);
+ EXPECT_EQ(15, total);
+ fs.shutdown();
+}
+
+TEST(FunctionScheduler, AddAfterStart) {
+ int total = 0;
+ FunctionScheduler fs;
+ fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
+ fs.addFunction([&] { total += 3; }, testInterval(2), "add3");
+ fs.start();
+ delay(3);
+ EXPECT_EQ(10, total);
+ fs.addFunction([&] { total += 2; }, testInterval(3), "add22");
+ delay(2);
+ EXPECT_EQ(17, total);
+}
+
+TEST(FunctionScheduler, ShutdownStart) {
+ int total = 0;
+ FunctionScheduler fs;
+ fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
+ fs.start();
+ delay(1);
+ fs.shutdown();
+ fs.start();
+ delay(1);
+ EXPECT_EQ(4, total);
+ EXPECT_FALSE(fs.cancelFunction("add3")); // non existing
+ delay(2);
+ EXPECT_EQ(6, total);
+}
+
+TEST(FunctionScheduler, AddInvalid) {
+ int total = 0;
+ FunctionScheduler fs;
+ // interval may not be negative
+ EXPECT_THROW(fs.addFunction([&] { total += 2; }, testInterval(-1), "add2"),
+ std::exception);
+ EXPECT_FALSE(fs.cancelFunction("addNoFunc"));
+}
+
+TEST(FunctionScheduler, NoFunctions) {
+ FunctionScheduler fs;
+ EXPECT_TRUE(fs.start());
+ fs.shutdown();
+ FunctionScheduler fs2;
+ fs2.shutdown();
+}
+
+TEST(FunctionScheduler, AddWhileRunning) {
+ int total = 0;
+ FunctionScheduler fs;
+ fs.start();
+ delay(1);
+ fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
+ // The function should be invoked nearly immediately when we add it
+ // and the FunctionScheduler is already running
+ usleep(50000);
+ EXPECT_EQ(2, total);
+ delay(2);
+ EXPECT_EQ(4, total);
+}
+
+TEST(FunctionScheduler, NoShutdown) {
+ int total = 0;
+ {
+ FunctionScheduler fs;
+ fs.addFunction([&] { total += 2; }, testInterval(1), "add2");
+ fs.start();
+ usleep(50000);
+ EXPECT_EQ(2, total);
+ }
+ // Destroyed the FunctionScheduler without calling shutdown.
+ // Everything should have been cleaned up, and the function will no longer
+ // get called.
+ delay(2);
+ EXPECT_EQ(2, total);
+}
+
+TEST(FunctionScheduler, StartDelay) {
+ int total = 0;
+ FunctionScheduler fs;
+ fs.addFunction([&] { total += 2; }, testInterval(2), "add2",
+ testInterval(2));
+ fs.addFunction([&] { total += 3; }, testInterval(3), "add3",
+ testInterval(2));
+ EXPECT_THROW(fs.addFunction([&] { total += 2; }, testInterval(3),
+ "addX", testInterval(-1)), std::exception);
+ fs.start();
+ delay(1); // t1
+ EXPECT_EQ(0, total);
+ // t2 : add2 total=2
+ // t2 : add3 total=5
+ delay(2); // t3
+ EXPECT_EQ(5, total);
+ // t4 : add2: total=7
+ // t5 : add3: total=10
+ // t6 : add2: total=12
+ delay(4); // t7
+ EXPECT_EQ(12, total);
+ fs.cancelFunction("add2");
+ // t8 : add3: total=15
+ delay(2); // t9
+ EXPECT_EQ(15, total);
+ fs.shutdown();
+ delay(3);
+ EXPECT_EQ(15, total);
+ fs.shutdown();
+}
+
+TEST(FunctionScheduler, NoSteadyCatchup) {
+ std::atomic<int> ticks(0);
+ FunctionScheduler fs;
+ fs.setThreadName("NoSteadyCatchup");
+ // fs.setSteady(false); is the default
+ fs.addFunction([&ticks] {
+ if (++ticks == 2) {
+ std::this_thread::sleep_for(
+ std::chrono::milliseconds(200));
+ }
+ },
+ std::chrono::milliseconds(5));
+ fs.start();
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+
+ // no steady catch up means we'd tick once for 200ms, then remaining
+ // 300ms / 5 = 60 times
+ EXPECT_LE(ticks.load(), 61);
+}
+
+TEST(FunctionScheduler, SteadyCatchup) {
+ std::atomic<int> ticks(0);
+ FunctionScheduler fs;
+ fs.setThreadName("SteadyCatchup");
+ fs.setSteady(true);
+ fs.addFunction([&ticks] {
+ if (++ticks == 2) {
+ std::this_thread::sleep_for(
+ std::chrono::milliseconds(200));
+ }
+ },
+ std::chrono::milliseconds(5));
+ fs.start();
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+
+ // tick every 5ms. Despite tick == 2 is slow, later ticks should be fast
+ // enough to catch back up to schedule
+ EXPECT_NEAR(100, ticks.load(), 10);
+}
+
+}