/*
- * Copyright 2016 Facebook, Inc.
+ * Copyright 2015-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include <folly/experimental/FunctionScheduler.h>
#include <algorithm>
#include <atomic>
#include <cassert>
#include <random>
+
+#include <boost/thread.hpp>
+
#include <folly/Random.h>
-#include <gtest/gtest.h>
+#include <folly/experimental/FunctionScheduler.h>
+#include <folly/portability/GTest.h>
+#include <folly/synchronization/Baton.h>
+
+#if defined(__linux__)
+#include <dlfcn.h>
+#endif
using namespace folly;
using std::chrono::milliseconds;
usleep(usec.count());
}
-} // unnamed namespace
+} // namespace
+
+TEST(FunctionScheduler, StartAndShutdown) {
+ FunctionScheduler fs;
+ EXPECT_TRUE(fs.start());
+ EXPECT_FALSE(fs.start());
+ EXPECT_TRUE(fs.shutdown());
+ EXPECT_FALSE(fs.shutdown());
+ // start again
+ EXPECT_TRUE(fs.start());
+ EXPECT_FALSE(fs.start());
+ EXPECT_TRUE(fs.shutdown());
+ EXPECT_FALSE(fs.shutdown());
+}
TEST(FunctionScheduler, SimpleAdd) {
int total = 0;
delay(2);
EXPECT_EQ(4, total);
fs.addFunction([&] { total += 1; }, testInterval(2), "add2");
- EXPECT_FALSE(fs.start()); // already running
delay(1);
EXPECT_EQ(5, total);
delay(2);
EXPECT_EQ(6, total);
}
+TEST(FunctionScheduler, ResetFunc) {
+ int total = 0;
+ FunctionScheduler fs;
+ fs.addFunction([&] { total += 2; }, testInterval(3), "add2");
+ fs.addFunction([&] { total += 3; }, testInterval(3), "add3");
+ fs.start();
+ delay(1);
+ EXPECT_EQ(5, total);
+ EXPECT_FALSE(fs.resetFunctionTimer("NON_EXISTING"));
+ EXPECT_TRUE(fs.resetFunctionTimer("add2"));
+ delay(1);
+ // t2: after the reset, add2 should have been invoked immediately
+ EXPECT_EQ(7, total);
+ usleep(150000);
+ // t3.5: add3 should have been invoked. add2 should not
+ EXPECT_EQ(10, total);
+ delay(1);
+ // t4.5: add2 should have been invoked once more (it was reset at t1)
+ EXPECT_EQ(12, total);
+}
+
+TEST(FunctionScheduler, ResetFuncWhileRunning) {
+ struct State {
+ boost::barrier barrier_a{2};
+ boost::barrier barrier_b{2};
+ boost::barrier barrier_c{2};
+ boost::barrier barrier_d{2};
+ bool set = false;
+ size_t count = 0;
+ };
+
+ State state; // held by ref
+ auto mv = std::make_shared<size_t>(); // gets moved
+
+ FunctionScheduler fs;
+ fs.addFunction(
+ [&, mv /* ref + shared_ptr fit in in-situ storage */] {
+ if (!state.set) { // first invocation
+ state.barrier_a.wait();
+ // ensure that resetFunctionTimer is called in this critical section
+ state.barrier_b.wait();
+ ++state.count;
+ EXPECT_TRUE(bool(mv)) << "bug repro: mv was moved-out";
+ state.barrier_c.wait();
+ // main thread checks count here
+ state.barrier_d.wait();
+ } else { // subsequent invocations
+ ++state.count;
+ }
+ },
+ testInterval(3),
+ "nada");
+ fs.start();
+
+ state.barrier_a.wait();
+ state.set = true;
+ fs.resetFunctionTimer("nada");
+ EXPECT_EQ(0, state.count) << "sanity check";
+ state.barrier_b.wait();
+ // fn thread increments count and checks mv here
+ state.barrier_c.wait();
+ EXPECT_EQ(1, state.count) << "sanity check";
+ state.barrier_d.wait();
+ delay(1);
+ EXPECT_EQ(2, state.count) << "sanity check";
+}
+
TEST(FunctionScheduler, AddInvalid) {
int total = 0;
FunctionScheduler fs;
delay(2);
EXPECT_EQ(6, total);
}
+
+TEST(FunctionScheduler, AddWithRunOnce) {
+ int total = 0;
+ FunctionScheduler fs;
+ fs.addFunctionOnce([&] { total += 2; }, "add2");
+ fs.start();
+ delay(1);
+ EXPECT_EQ(2, total);
+ delay(2);
+ EXPECT_EQ(2, total);
+
+ fs.addFunctionOnce([&] { total += 2; }, "add2");
+ delay(1);
+ EXPECT_EQ(4, total);
+ delay(2);
+ EXPECT_EQ(4, total);
+
+ fs.shutdown();
+}
+
+TEST(FunctionScheduler, cancelFunctionAndWait) {
+ int total = 0;
+ FunctionScheduler fs;
+ fs.addFunction(
+ [&] {
+ delay(5);
+ total += 2;
+ },
+ testInterval(100),
+ "add2");
+
+ fs.start();
+ delay(1);
+ EXPECT_EQ(0, total); // add2 is still sleeping
+
+ EXPECT_TRUE(fs.cancelFunctionAndWait("add2"));
+ EXPECT_EQ(2, total); // add2 should have completed
+
+ EXPECT_FALSE(fs.cancelFunction("add2")); // add2 has been canceled
+ fs.shutdown();
+}
+
+#if defined(__linux__)
+namespace {
+/**
+ * A helper class that forces our pthread_create() wrapper to fail when
+ * an PThreadCreateFailure object exists.
+ */
+class PThreadCreateFailure {
+ public:
+ PThreadCreateFailure() {
+ ++forceFailure_;
+ }
+ ~PThreadCreateFailure() {
+ --forceFailure_;
+ }
+
+ static bool shouldFail() {
+ return forceFailure_ > 0;
+ }
+
+ private:
+ static std::atomic<int> forceFailure_;
+};
+
+std::atomic<int> PThreadCreateFailure::forceFailure_{0};
+} // namespace
+
+// Replace the system pthread_create() function with our own stub, so we can
+// trigger failures in the StartThrows() test.
+extern "C" int pthread_create(
+ pthread_t* thread,
+ const pthread_attr_t* attr,
+ void* (*start_routine)(void*),
+ void* arg) {
+ static const auto realFunction = reinterpret_cast<decltype(&pthread_create)>(
+ dlsym(RTLD_NEXT, "pthread_create"));
+ // For sanity, make sure we didn't find ourself,
+ // since that would cause infinite recursion.
+ CHECK_NE(realFunction, pthread_create);
+
+ if (PThreadCreateFailure::shouldFail()) {
+ errno = EINVAL;
+ return -1;
+ }
+ return realFunction(thread, attr, start_routine, arg);
+}
+
+TEST(FunctionScheduler, StartThrows) {
+ FunctionScheduler fs;
+ PThreadCreateFailure fail;
+ EXPECT_ANY_THROW(fs.start());
+ EXPECT_NO_THROW(fs.shutdown());
+}
+#endif
+
+TEST(FunctionScheduler, cancelAllFunctionsAndWait) {
+ int total = 0;
+ FunctionScheduler fs;
+
+ fs.addFunction(
+ [&] {
+ delay(5);
+ total += 2;
+ },
+ testInterval(100),
+ "add2");
+
+ fs.start();
+ delay(1);
+ EXPECT_EQ(0, total); // add2 is still sleeping
+
+ fs.cancelAllFunctionsAndWait();
+ EXPECT_EQ(2, total);
+
+ EXPECT_FALSE(fs.cancelFunction("add2")); // add2 has been canceled
+ fs.shutdown();
+}
+
+TEST(FunctionScheduler, CancelAndWaitOnRunningFunc) {
+ folly::Baton<> baton;
+ std::thread th([&baton]() {
+ FunctionScheduler fs;
+ fs.addFunction([] { delay(10); }, testInterval(2), "func");
+ fs.start();
+ delay(1);
+ EXPECT_TRUE(fs.cancelFunctionAndWait("func"));
+ baton.post();
+ });
+
+ ASSERT_TRUE(baton.try_wait_for(testInterval(15)));
+ th.join();
+}
+
+TEST(FunctionScheduler, CancelAllAndWaitWithRunningFunc) {
+ folly::Baton<> baton;
+ std::thread th([&baton]() {
+ FunctionScheduler fs;
+ fs.addFunction([] { delay(10); }, testInterval(2), "func");
+ fs.start();
+ delay(1);
+ fs.cancelAllFunctionsAndWait();
+ baton.post();
+ });
+
+ ASSERT_TRUE(baton.try_wait_for(testInterval(15)));
+ th.join();
+}
+
+TEST(FunctionScheduler, CancelAllAndWaitWithOneRunningAndOneWaiting) {
+ folly::Baton<> baton;
+ std::thread th([&baton]() {
+ std::atomic<int> nExecuted(0);
+ FunctionScheduler fs;
+ fs.addFunction(
+ [&nExecuted] {
+ nExecuted++;
+ delay(10);
+ },
+ testInterval(2),
+ "func0");
+ fs.addFunction(
+ [&nExecuted] {
+ nExecuted++;
+ delay(10);
+ },
+ testInterval(2),
+ "func1",
+ testInterval(5));
+ fs.start();
+ delay(1);
+ fs.cancelAllFunctionsAndWait();
+ EXPECT_EQ(nExecuted, 1);
+ baton.post();
+ });
+
+ ASSERT_TRUE(baton.try_wait_for(testInterval(15)));
+ th.join();
+}
+
+TEST(FunctionScheduler, ConcurrentCancelFunctionAndWait) {
+ FunctionScheduler fs;
+ fs.addFunction([] { delay(10); }, testInterval(2), "func");
+
+ fs.start();
+ delay(1);
+ std::thread th1([&fs] { EXPECT_TRUE(fs.cancelFunctionAndWait("func")); });
+ delay(1);
+ std::thread th2([&fs] { EXPECT_FALSE(fs.cancelFunctionAndWait("func")); });
+ th1.join();
+ th2.join();
+}