Fix copyright lines
[folly.git] / folly / experimental / test / FunctionSchedulerTest.cpp
1 /*
2  * Copyright 2015-present Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <algorithm>
18 #include <atomic>
19 #include <cassert>
20 #include <random>
21
22 #include <boost/thread.hpp>
23
24 #include <folly/Random.h>
25 #include <folly/experimental/FunctionScheduler.h>
26 #include <folly/portability/GTest.h>
27 #include <folly/synchronization/Baton.h>
28
29 #if defined(__linux__)
30 #include <dlfcn.h>
31 #endif
32
33 using namespace folly;
34 using std::chrono::milliseconds;
35
36 namespace {
37
38 /*
39  * Helper functions for controlling how long this test takes.
40  *
41  * Using larger intervals here will make the tests less flaky when run on
42  * heavily loaded systems.  However, this will also make the tests take longer
43  * to run.
44  */
45 static const auto timeFactor = std::chrono::milliseconds(100);
46 std::chrono::milliseconds testInterval(int n) { return n * timeFactor; }
47 int getTicksWithinRange(int n, int min, int max) {
48   assert(min <= max);
49   n = std::max(min, n);
50   n = std::min(max, n);
51   return n;
52 }
53 void delay(int n) {
54   std::chrono::microseconds usec(n * timeFactor);
55   usleep(usec.count());
56 }
57
58 } // namespace
59
60 TEST(FunctionScheduler, StartAndShutdown) {
61   FunctionScheduler fs;
62   EXPECT_TRUE(fs.start());
63   EXPECT_FALSE(fs.start());
64   EXPECT_TRUE(fs.shutdown());
65   EXPECT_FALSE(fs.shutdown());
66   // start again
67   EXPECT_TRUE(fs.start());
68   EXPECT_FALSE(fs.start());
69   EXPECT_TRUE(fs.shutdown());
70   EXPECT_FALSE(fs.shutdown());
71 }
72
73 TEST(FunctionScheduler, SimpleAdd) {
74   int total = 0;
75   FunctionScheduler fs;
76   fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
77   fs.start();
78   delay(1);
79   EXPECT_EQ(2, total);
80   fs.shutdown();
81   delay(2);
82   EXPECT_EQ(2, total);
83 }
84
85 TEST(FunctionScheduler, AddCancel) {
86   int total = 0;
87   FunctionScheduler fs;
88   fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
89   fs.start();
90   delay(1);
91   EXPECT_EQ(2, total);
92   delay(2);
93   EXPECT_EQ(4, total);
94   EXPECT_TRUE(fs.cancelFunction("add2"));
95   EXPECT_FALSE(fs.cancelFunction("NO SUCH FUNC"));
96   delay(2);
97   EXPECT_EQ(4, total);
98   fs.addFunction([&] { total += 1; }, testInterval(2), "add2");
99   delay(1);
100   EXPECT_EQ(5, total);
101   delay(2);
102   EXPECT_EQ(6, total);
103   fs.shutdown();
104 }
105
106 TEST(FunctionScheduler, AddCancel2) {
107   int total = 0;
108   FunctionScheduler fs;
109
110   // Test adds and cancels while the scheduler is stopped
111   EXPECT_FALSE(fs.cancelFunction("add2"));
112   fs.addFunction([&] { total += 1; }, testInterval(2), "add2");
113   EXPECT_TRUE(fs.cancelFunction("add2"));
114   EXPECT_FALSE(fs.cancelFunction("add2"));
115   fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
116   fs.addFunction([&] { total += 3; }, testInterval(3), "add3");
117
118   EXPECT_EQ(0, total);
119   fs.start();
120   delay(1);
121   EXPECT_EQ(5, total);
122
123   // Cancel add2 while the scheduler is running
124   EXPECT_TRUE(fs.cancelFunction("add2"));
125   EXPECT_FALSE(fs.cancelFunction("add2"));
126   EXPECT_FALSE(fs.cancelFunction("bogus"));
127
128   delay(3);
129   EXPECT_EQ(8, total);
130   EXPECT_TRUE(fs.cancelFunction("add3"));
131
132   // Test a function that cancels itself
133   int selfCancelCount = 0;
134   fs.addFunction(
135       [&] {
136         ++selfCancelCount;
137         if (selfCancelCount > 2) {
138           fs.cancelFunction("selfCancel");
139         }
140       },
141       testInterval(1), "selfCancel", testInterval(1));
142   delay(4);
143   EXPECT_EQ(3, selfCancelCount);
144   EXPECT_FALSE(fs.cancelFunction("selfCancel"));
145
146   // Test a function that schedules another function
147   int adderCount = 0;
148   int fn2Count = 0;
149   auto fn2 = [&] { ++fn2Count; };
150   auto fnAdder = [&] {
151     ++adderCount;
152     if (adderCount == 2) {
153       fs.addFunction(fn2, testInterval(3), "fn2", testInterval(2));
154     }
155   };
156   fs.addFunction(fnAdder, testInterval(4), "adder");
157   // t0: adder fires
158   delay(1); // t1
159   EXPECT_EQ(1, adderCount);
160   EXPECT_EQ(0, fn2Count);
161   // t4: adder fires, schedules fn2
162   delay(4); // t5
163   EXPECT_EQ(2, adderCount);
164   EXPECT_EQ(0, fn2Count);
165   // t6: fn2 fires
166   delay(2); // t7
167   EXPECT_EQ(2, adderCount);
168   EXPECT_EQ(1, fn2Count);
169   // t8: adder fires
170   // t9: fn2 fires
171   delay(3); // t10
172   EXPECT_EQ(3, adderCount);
173   EXPECT_EQ(2, fn2Count);
174   EXPECT_TRUE(fs.cancelFunction("fn2"));
175   EXPECT_TRUE(fs.cancelFunction("adder"));
176   delay(5); // t10
177   EXPECT_EQ(3, adderCount);
178   EXPECT_EQ(2, fn2Count);
179
180   EXPECT_EQ(8, total);
181   EXPECT_EQ(3, selfCancelCount);
182 }
183
184 TEST(FunctionScheduler, AddMultiple) {
185   int total = 0;
186   FunctionScheduler fs;
187   fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
188   fs.addFunction([&] { total += 3; }, testInterval(3), "add3");
189   EXPECT_THROW(fs.addFunction([&] { total += 2; }, testInterval(2), "add2"),
190                std::invalid_argument); // function name already exists
191
192   fs.start();
193   delay(1);
194   EXPECT_EQ(5, total);
195   delay(4);
196   EXPECT_EQ(12, total);
197   EXPECT_TRUE(fs.cancelFunction("add2"));
198   delay(2);
199   EXPECT_EQ(15, total);
200   fs.shutdown();
201   delay(3);
202   EXPECT_EQ(15, total);
203   fs.shutdown();
204 }
205
206 TEST(FunctionScheduler, AddAfterStart) {
207   int total = 0;
208   FunctionScheduler fs;
209   fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
210   fs.addFunction([&] { total += 3; }, testInterval(2), "add3");
211   fs.start();
212   delay(3);
213   EXPECT_EQ(10, total);
214   fs.addFunction([&] { total += 2; }, testInterval(3), "add22");
215   delay(2);
216   EXPECT_EQ(17, total);
217 }
218
219 TEST(FunctionScheduler, ShutdownStart) {
220   int total = 0;
221   FunctionScheduler fs;
222   fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
223   fs.start();
224   delay(1);
225   fs.shutdown();
226   fs.start();
227   delay(1);
228   EXPECT_EQ(4, total);
229   EXPECT_FALSE(fs.cancelFunction("add3")); // non existing
230   delay(2);
231   EXPECT_EQ(6, total);
232 }
233
234 TEST(FunctionScheduler, ResetFunc) {
235   int total = 0;
236   FunctionScheduler fs;
237   fs.addFunction([&] { total += 2; }, testInterval(3), "add2");
238   fs.addFunction([&] { total += 3; }, testInterval(3), "add3");
239   fs.start();
240   delay(1);
241   EXPECT_EQ(5, total);
242   EXPECT_FALSE(fs.resetFunctionTimer("NON_EXISTING"));
243   EXPECT_TRUE(fs.resetFunctionTimer("add2"));
244   delay(1);
245   // t2: after the reset, add2 should have been invoked immediately
246   EXPECT_EQ(7, total);
247   usleep(150000);
248   // t3.5: add3 should have been invoked. add2 should not
249   EXPECT_EQ(10, total);
250   delay(1);
251   // t4.5: add2 should have been invoked once more (it was reset at t1)
252   EXPECT_EQ(12, total);
253 }
254
255 TEST(FunctionScheduler, ResetFuncWhileRunning) {
256   struct State {
257     boost::barrier barrier_a{2};
258     boost::barrier barrier_b{2};
259     boost::barrier barrier_c{2};
260     boost::barrier barrier_d{2};
261     bool set = false;
262     size_t count = 0;
263   };
264
265   State state; // held by ref
266   auto mv = std::make_shared<size_t>(); // gets moved
267
268   FunctionScheduler fs;
269   fs.addFunction(
270       [&, mv /* ref + shared_ptr fit in in-situ storage */] {
271         if (!state.set) { // first invocation
272           state.barrier_a.wait();
273           // ensure that resetFunctionTimer is called in this critical section
274           state.barrier_b.wait();
275           ++state.count;
276           EXPECT_TRUE(bool(mv)) << "bug repro: mv was moved-out";
277           state.barrier_c.wait();
278           // main thread checks count here
279           state.barrier_d.wait();
280         } else { // subsequent invocations
281           ++state.count;
282         }
283       },
284       testInterval(3),
285       "nada");
286   fs.start();
287
288   state.barrier_a.wait();
289   state.set = true;
290   fs.resetFunctionTimer("nada");
291   EXPECT_EQ(0, state.count) << "sanity check";
292   state.barrier_b.wait();
293   // fn thread increments count and checks mv here
294   state.barrier_c.wait();
295   EXPECT_EQ(1, state.count) << "sanity check";
296   state.barrier_d.wait();
297   delay(1);
298   EXPECT_EQ(2, state.count) << "sanity check";
299 }
300
301 TEST(FunctionScheduler, AddInvalid) {
302   int total = 0;
303   FunctionScheduler fs;
304   // interval may not be negative
305   EXPECT_THROW(fs.addFunction([&] { total += 2; }, testInterval(-1), "add2"),
306                std::invalid_argument);
307
308   EXPECT_FALSE(fs.cancelFunction("addNoFunc"));
309 }
310
311 TEST(FunctionScheduler, NoFunctions) {
312   FunctionScheduler fs;
313   EXPECT_TRUE(fs.start());
314   fs.shutdown();
315   FunctionScheduler fs2;
316   fs2.shutdown();
317 }
318
319 TEST(FunctionScheduler, AddWhileRunning) {
320   int total = 0;
321   FunctionScheduler fs;
322   fs.start();
323   delay(1);
324   fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
325   // The function should be invoked nearly immediately when we add it
326   // and the FunctionScheduler is already running
327   usleep(50000);
328   EXPECT_EQ(2, total);
329   delay(2);
330   EXPECT_EQ(4, total);
331 }
332
333 TEST(FunctionScheduler, NoShutdown) {
334   int total = 0;
335   {
336     FunctionScheduler fs;
337     fs.addFunction([&] { total += 2; }, testInterval(1), "add2");
338     fs.start();
339     usleep(50000);
340     EXPECT_EQ(2, total);
341   }
342   // Destroyed the FunctionScheduler without calling shutdown.
343   // Everything should have been cleaned up, and the function will no longer
344   // get called.
345   delay(2);
346   EXPECT_EQ(2, total);
347 }
348
349 TEST(FunctionScheduler, StartDelay) {
350   int total = 0;
351   FunctionScheduler fs;
352   fs.addFunction([&] { total += 2; }, testInterval(2), "add2",
353                  testInterval(2));
354   fs.addFunction([&] { total += 3; }, testInterval(3), "add3",
355                  testInterval(2));
356   EXPECT_THROW(fs.addFunction([&] { total += 2; }, testInterval(3),
357                               "addX", testInterval(-1)),
358                std::invalid_argument);
359   fs.start();
360   delay(1); // t1
361   EXPECT_EQ(0, total);
362   // t2 : add2 total=2
363   // t2 : add3 total=5
364   delay(2); // t3
365   EXPECT_EQ(5, total);
366   // t4 : add2: total=7
367   // t5 : add3: total=10
368   // t6 : add2: total=12
369   delay(4); // t7
370   EXPECT_EQ(12, total);
371   fs.cancelFunction("add2");
372   // t8 : add3: total=15
373   delay(2); // t9
374   EXPECT_EQ(15, total);
375   fs.shutdown();
376   delay(3);
377   EXPECT_EQ(15, total);
378   fs.shutdown();
379 }
380
381 TEST(FunctionScheduler, NoSteadyCatchup) {
382   std::atomic<int> ticks(0);
383   FunctionScheduler fs;
384   // fs.setSteady(false); is the default
385   fs.addFunction([&ticks] {
386                    if (++ticks == 2) {
387                      std::this_thread::sleep_for(
388                          std::chrono::milliseconds(200));
389                    }
390                  },
391                  milliseconds(5));
392   fs.start();
393   std::this_thread::sleep_for(std::chrono::milliseconds(500));
394
395   // no steady catch up means we'd tick once for 200ms, then remaining
396   // 300ms / 5 = 60 times
397   EXPECT_LE(ticks.load(), 61);
398 }
399
400 TEST(FunctionScheduler, SteadyCatchup) {
401   std::atomic<int> ticks(0);
402   FunctionScheduler fs;
403   fs.setSteady(true);
404   fs.addFunction([&ticks] {
405                    if (++ticks == 2) {
406                      std::this_thread::sleep_for(
407                          std::chrono::milliseconds(200));
408                    }
409                  },
410                  milliseconds(5));
411   fs.start();
412
413   std::this_thread::sleep_for(std::chrono::milliseconds(500));
414
415   // tick every 5ms. Despite tick == 2 is slow, later ticks should be fast
416   // enough to catch back up to schedule
417   EXPECT_NEAR(100, ticks.load(), 10);
418 }
419
420 TEST(FunctionScheduler, UniformDistribution) {
421   int total = 0;
422   const int kTicks = 2;
423   std::chrono::milliseconds minInterval =
424       testInterval(kTicks) - (timeFactor / 5);
425   std::chrono::milliseconds maxInterval =
426       testInterval(kTicks) + (timeFactor / 5);
427   FunctionScheduler fs;
428   fs.addFunctionUniformDistribution([&] { total += 2; },
429                                     minInterval,
430                                     maxInterval,
431                                     "UniformDistribution",
432                                     std::chrono::milliseconds(0));
433   fs.start();
434   delay(1);
435   EXPECT_EQ(2, total);
436   delay(kTicks);
437   EXPECT_EQ(4, total);
438   delay(kTicks);
439   EXPECT_EQ(6, total);
440   fs.shutdown();
441   delay(2);
442   EXPECT_EQ(6, total);
443 }
444
445 TEST(FunctionScheduler, ExponentialBackoff) {
446   int total = 0;
447   int expectedInterval = 0;
448   int nextInterval = 2;
449   FunctionScheduler fs;
450   fs.addFunctionGenericDistribution(
451       [&] { total += 2; },
452       [&expectedInterval, nextInterval]() mutable {
453         expectedInterval = nextInterval;
454         nextInterval *= nextInterval;
455         return testInterval(expectedInterval);
456       },
457       "ExponentialBackoff",
458       "2^n * 100ms",
459       std::chrono::milliseconds(0));
460   fs.start();
461   delay(1);
462   EXPECT_EQ(2, total);
463   delay(expectedInterval);
464   EXPECT_EQ(4, total);
465   delay(expectedInterval);
466   EXPECT_EQ(6, total);
467   fs.shutdown();
468   delay(2);
469   EXPECT_EQ(6, total);
470 }
471
472 TEST(FunctionScheduler, GammaIntervalDistribution) {
473   int total = 0;
474   int expectedInterval = 0;
475   FunctionScheduler fs;
476   std::default_random_engine generator(folly::Random::rand32());
477   // The alpha and beta arguments are selected, somewhat randomly, to be 2.0.
478   // These values do not matter much in this test, as we are not testing the
479   // std::gamma_distribution itself...
480   std::gamma_distribution<double> gamma(2.0, 2.0);
481   fs.addFunctionGenericDistribution(
482       [&] { total += 2; },
483       [&expectedInterval, generator, gamma]() mutable {
484         expectedInterval =
485             getTicksWithinRange(static_cast<int>(gamma(generator)), 2, 10);
486         return testInterval(expectedInterval);
487       },
488       "GammaDistribution",
489       "gamma(2.0,2.0)*100ms",
490       std::chrono::milliseconds(0));
491   fs.start();
492   delay(1);
493   EXPECT_EQ(2, total);
494   delay(expectedInterval);
495   EXPECT_EQ(4, total);
496   delay(expectedInterval);
497   EXPECT_EQ(6, total);
498   fs.shutdown();
499   delay(2);
500   EXPECT_EQ(6, total);
501 }
502
503 TEST(FunctionScheduler, AddWithRunOnce) {
504   int total = 0;
505   FunctionScheduler fs;
506   fs.addFunctionOnce([&] { total += 2; }, "add2");
507   fs.start();
508   delay(1);
509   EXPECT_EQ(2, total);
510   delay(2);
511   EXPECT_EQ(2, total);
512
513   fs.addFunctionOnce([&] { total += 2; }, "add2");
514   delay(1);
515   EXPECT_EQ(4, total);
516   delay(2);
517   EXPECT_EQ(4, total);
518
519   fs.shutdown();
520 }
521
522 TEST(FunctionScheduler, cancelFunctionAndWait) {
523   int total = 0;
524   FunctionScheduler fs;
525   fs.addFunction(
526       [&] {
527         delay(5);
528         total += 2;
529       },
530       testInterval(100),
531       "add2");
532
533   fs.start();
534   delay(1);
535   EXPECT_EQ(0, total); // add2 is still sleeping
536
537   EXPECT_TRUE(fs.cancelFunctionAndWait("add2"));
538   EXPECT_EQ(2, total); // add2 should have completed
539
540   EXPECT_FALSE(fs.cancelFunction("add2")); // add2 has been canceled
541   fs.shutdown();
542 }
543
544 #if defined(__linux__)
545 namespace {
546 /**
547  * A helper class that forces our pthread_create() wrapper to fail when
548  * an PThreadCreateFailure object exists.
549  */
550 class PThreadCreateFailure {
551  public:
552   PThreadCreateFailure() {
553     ++forceFailure_;
554   }
555   ~PThreadCreateFailure() {
556     --forceFailure_;
557   }
558
559   static bool shouldFail() {
560     return forceFailure_ > 0;
561   }
562
563  private:
564   static std::atomic<int> forceFailure_;
565 };
566
567 std::atomic<int> PThreadCreateFailure::forceFailure_{0};
568 } // namespace
569
570 // Replace the system pthread_create() function with our own stub, so we can
571 // trigger failures in the StartThrows() test.
572 extern "C" int pthread_create(
573     pthread_t* thread,
574     const pthread_attr_t* attr,
575     void* (*start_routine)(void*),
576     void* arg) {
577   static const auto realFunction = reinterpret_cast<decltype(&pthread_create)>(
578       dlsym(RTLD_NEXT, "pthread_create"));
579   // For sanity, make sure we didn't find ourself,
580   // since that would cause infinite recursion.
581   CHECK_NE(realFunction, pthread_create);
582
583   if (PThreadCreateFailure::shouldFail()) {
584     errno = EINVAL;
585     return -1;
586   }
587   return realFunction(thread, attr, start_routine, arg);
588 }
589
590 TEST(FunctionScheduler, StartThrows) {
591   FunctionScheduler fs;
592   PThreadCreateFailure fail;
593   EXPECT_ANY_THROW(fs.start());
594   EXPECT_NO_THROW(fs.shutdown());
595 }
596 #endif
597
598 TEST(FunctionScheduler, cancelAllFunctionsAndWait) {
599   int total = 0;
600   FunctionScheduler fs;
601
602   fs.addFunction(
603       [&] {
604         delay(5);
605         total += 2;
606       },
607       testInterval(100),
608       "add2");
609
610   fs.start();
611   delay(1);
612   EXPECT_EQ(0, total); // add2 is still sleeping
613
614   fs.cancelAllFunctionsAndWait();
615   EXPECT_EQ(2, total);
616
617   EXPECT_FALSE(fs.cancelFunction("add2")); // add2 has been canceled
618   fs.shutdown();
619 }
620
621 TEST(FunctionScheduler, CancelAndWaitOnRunningFunc) {
622   folly::Baton<> baton;
623   std::thread th([&baton]() {
624     FunctionScheduler fs;
625     fs.addFunction([] { delay(10); }, testInterval(2), "func");
626     fs.start();
627     delay(1);
628     EXPECT_TRUE(fs.cancelFunctionAndWait("func"));
629     baton.post();
630   });
631
632   ASSERT_TRUE(baton.try_wait_for(testInterval(15)));
633   th.join();
634 }
635
636 TEST(FunctionScheduler, CancelAllAndWaitWithRunningFunc) {
637   folly::Baton<> baton;
638   std::thread th([&baton]() {
639     FunctionScheduler fs;
640     fs.addFunction([] { delay(10); }, testInterval(2), "func");
641     fs.start();
642     delay(1);
643     fs.cancelAllFunctionsAndWait();
644     baton.post();
645   });
646
647   ASSERT_TRUE(baton.try_wait_for(testInterval(15)));
648   th.join();
649 }
650
651 TEST(FunctionScheduler, CancelAllAndWaitWithOneRunningAndOneWaiting) {
652   folly::Baton<> baton;
653   std::thread th([&baton]() {
654     std::atomic<int> nExecuted(0);
655     FunctionScheduler fs;
656     fs.addFunction(
657         [&nExecuted] {
658           nExecuted++;
659           delay(10);
660         },
661         testInterval(2),
662         "func0");
663     fs.addFunction(
664         [&nExecuted] {
665           nExecuted++;
666           delay(10);
667         },
668         testInterval(2),
669         "func1",
670         testInterval(5));
671     fs.start();
672     delay(1);
673     fs.cancelAllFunctionsAndWait();
674     EXPECT_EQ(nExecuted, 1);
675     baton.post();
676   });
677
678   ASSERT_TRUE(baton.try_wait_for(testInterval(15)));
679   th.join();
680 }
681
682 TEST(FunctionScheduler, ConcurrentCancelFunctionAndWait) {
683   FunctionScheduler fs;
684   fs.addFunction([] { delay(10); }, testInterval(2), "func");
685
686   fs.start();
687   delay(1);
688   std::thread th1([&fs] { EXPECT_TRUE(fs.cancelFunctionAndWait("func")); });
689   delay(1);
690   std::thread th2([&fs] { EXPECT_FALSE(fs.cancelFunctionAndWait("func")); });
691   th1.join();
692   th2.join();
693 }