2 * Copyright 2015-present Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
22 #include <boost/thread.hpp>
24 #include <folly/Random.h>
25 #include <folly/experimental/FunctionScheduler.h>
26 #include <folly/portability/GTest.h>
27 #include <folly/synchronization/Baton.h>
29 #if defined(__linux__)
33 using namespace folly;
34 using std::chrono::milliseconds;
39 * Helper functions for controlling how long this test takes.
41 * Using larger intervals here will make the tests less flaky when run on
42 * heavily loaded systems. However, this will also make the tests take longer
45 static const auto timeFactor = std::chrono::milliseconds(100);
46 std::chrono::milliseconds testInterval(int n) { return n * timeFactor; }
47 int getTicksWithinRange(int n, int min, int max) {
54 std::chrono::microseconds usec(n * timeFactor);
60 TEST(FunctionScheduler, StartAndShutdown) {
62 EXPECT_TRUE(fs.start());
63 EXPECT_FALSE(fs.start());
64 EXPECT_TRUE(fs.shutdown());
65 EXPECT_FALSE(fs.shutdown());
67 EXPECT_TRUE(fs.start());
68 EXPECT_FALSE(fs.start());
69 EXPECT_TRUE(fs.shutdown());
70 EXPECT_FALSE(fs.shutdown());
73 TEST(FunctionScheduler, SimpleAdd) {
76 fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
85 TEST(FunctionScheduler, AddCancel) {
88 fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
94 EXPECT_TRUE(fs.cancelFunction("add2"));
95 EXPECT_FALSE(fs.cancelFunction("NO SUCH FUNC"));
98 fs.addFunction([&] { total += 1; }, testInterval(2), "add2");
106 TEST(FunctionScheduler, AddCancel2) {
108 FunctionScheduler fs;
110 // Test adds and cancels while the scheduler is stopped
111 EXPECT_FALSE(fs.cancelFunction("add2"));
112 fs.addFunction([&] { total += 1; }, testInterval(2), "add2");
113 EXPECT_TRUE(fs.cancelFunction("add2"));
114 EXPECT_FALSE(fs.cancelFunction("add2"));
115 fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
116 fs.addFunction([&] { total += 3; }, testInterval(3), "add3");
123 // Cancel add2 while the scheduler is running
124 EXPECT_TRUE(fs.cancelFunction("add2"));
125 EXPECT_FALSE(fs.cancelFunction("add2"));
126 EXPECT_FALSE(fs.cancelFunction("bogus"));
130 EXPECT_TRUE(fs.cancelFunction("add3"));
132 // Test a function that cancels itself
133 int selfCancelCount = 0;
137 if (selfCancelCount > 2) {
138 fs.cancelFunction("selfCancel");
141 testInterval(1), "selfCancel", testInterval(1));
143 EXPECT_EQ(3, selfCancelCount);
144 EXPECT_FALSE(fs.cancelFunction("selfCancel"));
146 // Test a function that schedules another function
149 auto fn2 = [&] { ++fn2Count; };
152 if (adderCount == 2) {
153 fs.addFunction(fn2, testInterval(3), "fn2", testInterval(2));
156 fs.addFunction(fnAdder, testInterval(4), "adder");
159 EXPECT_EQ(1, adderCount);
160 EXPECT_EQ(0, fn2Count);
161 // t4: adder fires, schedules fn2
163 EXPECT_EQ(2, adderCount);
164 EXPECT_EQ(0, fn2Count);
167 EXPECT_EQ(2, adderCount);
168 EXPECT_EQ(1, fn2Count);
172 EXPECT_EQ(3, adderCount);
173 EXPECT_EQ(2, fn2Count);
174 EXPECT_TRUE(fs.cancelFunction("fn2"));
175 EXPECT_TRUE(fs.cancelFunction("adder"));
177 EXPECT_EQ(3, adderCount);
178 EXPECT_EQ(2, fn2Count);
181 EXPECT_EQ(3, selfCancelCount);
184 TEST(FunctionScheduler, AddMultiple) {
186 FunctionScheduler fs;
187 fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
188 fs.addFunction([&] { total += 3; }, testInterval(3), "add3");
189 EXPECT_THROW(fs.addFunction([&] { total += 2; }, testInterval(2), "add2"),
190 std::invalid_argument); // function name already exists
196 EXPECT_EQ(12, total);
197 EXPECT_TRUE(fs.cancelFunction("add2"));
199 EXPECT_EQ(15, total);
202 EXPECT_EQ(15, total);
206 TEST(FunctionScheduler, AddAfterStart) {
208 FunctionScheduler fs;
209 fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
210 fs.addFunction([&] { total += 3; }, testInterval(2), "add3");
213 EXPECT_EQ(10, total);
214 fs.addFunction([&] { total += 2; }, testInterval(3), "add22");
216 EXPECT_EQ(17, total);
219 TEST(FunctionScheduler, ShutdownStart) {
221 FunctionScheduler fs;
222 fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
229 EXPECT_FALSE(fs.cancelFunction("add3")); // non existing
234 TEST(FunctionScheduler, ResetFunc) {
236 FunctionScheduler fs;
237 fs.addFunction([&] { total += 2; }, testInterval(3), "add2");
238 fs.addFunction([&] { total += 3; }, testInterval(3), "add3");
242 EXPECT_FALSE(fs.resetFunctionTimer("NON_EXISTING"));
243 EXPECT_TRUE(fs.resetFunctionTimer("add2"));
245 // t2: after the reset, add2 should have been invoked immediately
248 // t3.5: add3 should have been invoked. add2 should not
249 EXPECT_EQ(10, total);
251 // t4.5: add2 should have been invoked once more (it was reset at t1)
252 EXPECT_EQ(12, total);
255 TEST(FunctionScheduler, ResetFuncWhileRunning) {
257 boost::barrier barrier_a{2};
258 boost::barrier barrier_b{2};
259 boost::barrier barrier_c{2};
260 boost::barrier barrier_d{2};
265 State state; // held by ref
266 auto mv = std::make_shared<size_t>(); // gets moved
268 FunctionScheduler fs;
270 [&, mv /* ref + shared_ptr fit in in-situ storage */] {
271 if (!state.set) { // first invocation
272 state.barrier_a.wait();
273 // ensure that resetFunctionTimer is called in this critical section
274 state.barrier_b.wait();
276 EXPECT_TRUE(bool(mv)) << "bug repro: mv was moved-out";
277 state.barrier_c.wait();
278 // main thread checks count here
279 state.barrier_d.wait();
280 } else { // subsequent invocations
288 state.barrier_a.wait();
290 fs.resetFunctionTimer("nada");
291 EXPECT_EQ(0, state.count) << "sanity check";
292 state.barrier_b.wait();
293 // fn thread increments count and checks mv here
294 state.barrier_c.wait();
295 EXPECT_EQ(1, state.count) << "sanity check";
296 state.barrier_d.wait();
298 EXPECT_EQ(2, state.count) << "sanity check";
301 TEST(FunctionScheduler, AddInvalid) {
303 FunctionScheduler fs;
304 // interval may not be negative
305 EXPECT_THROW(fs.addFunction([&] { total += 2; }, testInterval(-1), "add2"),
306 std::invalid_argument);
308 EXPECT_FALSE(fs.cancelFunction("addNoFunc"));
311 TEST(FunctionScheduler, NoFunctions) {
312 FunctionScheduler fs;
313 EXPECT_TRUE(fs.start());
315 FunctionScheduler fs2;
319 TEST(FunctionScheduler, AddWhileRunning) {
321 FunctionScheduler fs;
324 fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
325 // The function should be invoked nearly immediately when we add it
326 // and the FunctionScheduler is already running
333 TEST(FunctionScheduler, NoShutdown) {
336 FunctionScheduler fs;
337 fs.addFunction([&] { total += 2; }, testInterval(1), "add2");
342 // Destroyed the FunctionScheduler without calling shutdown.
343 // Everything should have been cleaned up, and the function will no longer
349 TEST(FunctionScheduler, StartDelay) {
351 FunctionScheduler fs;
352 fs.addFunction([&] { total += 2; }, testInterval(2), "add2",
354 fs.addFunction([&] { total += 3; }, testInterval(3), "add3",
356 EXPECT_THROW(fs.addFunction([&] { total += 2; }, testInterval(3),
357 "addX", testInterval(-1)),
358 std::invalid_argument);
366 // t4 : add2: total=7
367 // t5 : add3: total=10
368 // t6 : add2: total=12
370 EXPECT_EQ(12, total);
371 fs.cancelFunction("add2");
372 // t8 : add3: total=15
374 EXPECT_EQ(15, total);
377 EXPECT_EQ(15, total);
381 TEST(FunctionScheduler, NoSteadyCatchup) {
382 std::atomic<int> ticks(0);
383 FunctionScheduler fs;
384 // fs.setSteady(false); is the default
385 fs.addFunction([&ticks] {
387 std::this_thread::sleep_for(
388 std::chrono::milliseconds(200));
393 std::this_thread::sleep_for(std::chrono::milliseconds(500));
395 // no steady catch up means we'd tick once for 200ms, then remaining
396 // 300ms / 5 = 60 times
397 EXPECT_LE(ticks.load(), 61);
400 TEST(FunctionScheduler, SteadyCatchup) {
401 std::atomic<int> ticks(0);
402 FunctionScheduler fs;
404 fs.addFunction([&ticks] {
406 std::this_thread::sleep_for(
407 std::chrono::milliseconds(200));
413 std::this_thread::sleep_for(std::chrono::milliseconds(500));
415 // tick every 5ms. Despite tick == 2 is slow, later ticks should be fast
416 // enough to catch back up to schedule
417 EXPECT_NEAR(100, ticks.load(), 10);
420 TEST(FunctionScheduler, UniformDistribution) {
422 const int kTicks = 2;
423 std::chrono::milliseconds minInterval =
424 testInterval(kTicks) - (timeFactor / 5);
425 std::chrono::milliseconds maxInterval =
426 testInterval(kTicks) + (timeFactor / 5);
427 FunctionScheduler fs;
428 fs.addFunctionUniformDistribution([&] { total += 2; },
431 "UniformDistribution",
432 std::chrono::milliseconds(0));
445 TEST(FunctionScheduler, ExponentialBackoff) {
447 int expectedInterval = 0;
448 int nextInterval = 2;
449 FunctionScheduler fs;
450 fs.addFunctionGenericDistribution(
452 [&expectedInterval, nextInterval]() mutable {
453 expectedInterval = nextInterval;
454 nextInterval *= nextInterval;
455 return testInterval(expectedInterval);
457 "ExponentialBackoff",
459 std::chrono::milliseconds(0));
463 delay(expectedInterval);
465 delay(expectedInterval);
472 TEST(FunctionScheduler, GammaIntervalDistribution) {
474 int expectedInterval = 0;
475 FunctionScheduler fs;
476 std::default_random_engine generator(folly::Random::rand32());
477 // The alpha and beta arguments are selected, somewhat randomly, to be 2.0.
478 // These values do not matter much in this test, as we are not testing the
479 // std::gamma_distribution itself...
480 std::gamma_distribution<double> gamma(2.0, 2.0);
481 fs.addFunctionGenericDistribution(
483 [&expectedInterval, generator, gamma]() mutable {
485 getTicksWithinRange(static_cast<int>(gamma(generator)), 2, 10);
486 return testInterval(expectedInterval);
489 "gamma(2.0,2.0)*100ms",
490 std::chrono::milliseconds(0));
494 delay(expectedInterval);
496 delay(expectedInterval);
503 TEST(FunctionScheduler, AddWithRunOnce) {
505 FunctionScheduler fs;
506 fs.addFunctionOnce([&] { total += 2; }, "add2");
513 fs.addFunctionOnce([&] { total += 2; }, "add2");
522 TEST(FunctionScheduler, cancelFunctionAndWait) {
524 FunctionScheduler fs;
535 EXPECT_EQ(0, total); // add2 is still sleeping
537 EXPECT_TRUE(fs.cancelFunctionAndWait("add2"));
538 EXPECT_EQ(2, total); // add2 should have completed
540 EXPECT_FALSE(fs.cancelFunction("add2")); // add2 has been canceled
544 #if defined(__linux__)
547 * A helper class that forces our pthread_create() wrapper to fail when
548 * an PThreadCreateFailure object exists.
550 class PThreadCreateFailure {
552 PThreadCreateFailure() {
555 ~PThreadCreateFailure() {
559 static bool shouldFail() {
560 return forceFailure_ > 0;
564 static std::atomic<int> forceFailure_;
567 std::atomic<int> PThreadCreateFailure::forceFailure_{0};
570 // Replace the system pthread_create() function with our own stub, so we can
571 // trigger failures in the StartThrows() test.
572 extern "C" int pthread_create(
574 const pthread_attr_t* attr,
575 void* (*start_routine)(void*),
577 static const auto realFunction = reinterpret_cast<decltype(&pthread_create)>(
578 dlsym(RTLD_NEXT, "pthread_create"));
579 // For sanity, make sure we didn't find ourself,
580 // since that would cause infinite recursion.
581 CHECK_NE(realFunction, pthread_create);
583 if (PThreadCreateFailure::shouldFail()) {
587 return realFunction(thread, attr, start_routine, arg);
590 TEST(FunctionScheduler, StartThrows) {
591 FunctionScheduler fs;
592 PThreadCreateFailure fail;
593 EXPECT_ANY_THROW(fs.start());
594 EXPECT_NO_THROW(fs.shutdown());
598 TEST(FunctionScheduler, cancelAllFunctionsAndWait) {
600 FunctionScheduler fs;
612 EXPECT_EQ(0, total); // add2 is still sleeping
614 fs.cancelAllFunctionsAndWait();
617 EXPECT_FALSE(fs.cancelFunction("add2")); // add2 has been canceled
621 TEST(FunctionScheduler, CancelAndWaitOnRunningFunc) {
622 folly::Baton<> baton;
623 std::thread th([&baton]() {
624 FunctionScheduler fs;
625 fs.addFunction([] { delay(10); }, testInterval(2), "func");
628 EXPECT_TRUE(fs.cancelFunctionAndWait("func"));
632 ASSERT_TRUE(baton.try_wait_for(testInterval(15)));
636 TEST(FunctionScheduler, CancelAllAndWaitWithRunningFunc) {
637 folly::Baton<> baton;
638 std::thread th([&baton]() {
639 FunctionScheduler fs;
640 fs.addFunction([] { delay(10); }, testInterval(2), "func");
643 fs.cancelAllFunctionsAndWait();
647 ASSERT_TRUE(baton.try_wait_for(testInterval(15)));
651 TEST(FunctionScheduler, CancelAllAndWaitWithOneRunningAndOneWaiting) {
652 folly::Baton<> baton;
653 std::thread th([&baton]() {
654 std::atomic<int> nExecuted(0);
655 FunctionScheduler fs;
673 fs.cancelAllFunctionsAndWait();
674 EXPECT_EQ(nExecuted, 1);
678 ASSERT_TRUE(baton.try_wait_for(testInterval(15)));
682 TEST(FunctionScheduler, ConcurrentCancelFunctionAndWait) {
683 FunctionScheduler fs;
684 fs.addFunction([] { delay(10); }, testInterval(2), "func");
688 std::thread th1([&fs] { EXPECT_TRUE(fs.cancelFunctionAndWait("func")); });
690 std::thread th2([&fs] { EXPECT_FALSE(fs.cancelFunctionAndWait("func")); });