Consistency in namespace-closing comments
[folly.git] / folly / experimental / test / FunctionSchedulerTest.cpp
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <algorithm>
17 #include <atomic>
18 #include <cassert>
19 #include <random>
20
21 #include <folly/Baton.h>
22 #include <folly/Random.h>
23 #include <folly/experimental/FunctionScheduler.h>
24 #include <folly/portability/GTest.h>
25
26 #if defined(__linux__)
27 #include <dlfcn.h>
28 #endif
29
30 using namespace folly;
31 using std::chrono::milliseconds;
32
33 namespace {
34
35 /*
36  * Helper functions for controlling how long this test takes.
37  *
38  * Using larger intervals here will make the tests less flaky when run on
39  * heavily loaded systems.  However, this will also make the tests take longer
40  * to run.
41  */
42 static const auto timeFactor = std::chrono::milliseconds(100);
43 std::chrono::milliseconds testInterval(int n) { return n * timeFactor; }
44 int getTicksWithinRange(int n, int min, int max) {
45   assert(min <= max);
46   n = std::max(min, n);
47   n = std::min(max, n);
48   return n;
49 }
50 void delay(int n) {
51   std::chrono::microseconds usec(n * timeFactor);
52   usleep(usec.count());
53 }
54
55 } // namespace
56
57 TEST(FunctionScheduler, StartAndShutdown) {
58   FunctionScheduler fs;
59   EXPECT_TRUE(fs.start());
60   EXPECT_FALSE(fs.start());
61   EXPECT_TRUE(fs.shutdown());
62   EXPECT_FALSE(fs.shutdown());
63   // start again
64   EXPECT_TRUE(fs.start());
65   EXPECT_FALSE(fs.start());
66   EXPECT_TRUE(fs.shutdown());
67   EXPECT_FALSE(fs.shutdown());
68 }
69
70 TEST(FunctionScheduler, SimpleAdd) {
71   int total = 0;
72   FunctionScheduler fs;
73   fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
74   fs.start();
75   delay(1);
76   EXPECT_EQ(2, total);
77   fs.shutdown();
78   delay(2);
79   EXPECT_EQ(2, total);
80 }
81
82 TEST(FunctionScheduler, AddCancel) {
83   int total = 0;
84   FunctionScheduler fs;
85   fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
86   fs.start();
87   delay(1);
88   EXPECT_EQ(2, total);
89   delay(2);
90   EXPECT_EQ(4, total);
91   EXPECT_TRUE(fs.cancelFunction("add2"));
92   EXPECT_FALSE(fs.cancelFunction("NO SUCH FUNC"));
93   delay(2);
94   EXPECT_EQ(4, total);
95   fs.addFunction([&] { total += 1; }, testInterval(2), "add2");
96   delay(1);
97   EXPECT_EQ(5, total);
98   delay(2);
99   EXPECT_EQ(6, total);
100   fs.shutdown();
101 }
102
103 TEST(FunctionScheduler, AddCancel2) {
104   int total = 0;
105   FunctionScheduler fs;
106
107   // Test adds and cancels while the scheduler is stopped
108   EXPECT_FALSE(fs.cancelFunction("add2"));
109   fs.addFunction([&] { total += 1; }, testInterval(2), "add2");
110   EXPECT_TRUE(fs.cancelFunction("add2"));
111   EXPECT_FALSE(fs.cancelFunction("add2"));
112   fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
113   fs.addFunction([&] { total += 3; }, testInterval(3), "add3");
114
115   EXPECT_EQ(0, total);
116   fs.start();
117   delay(1);
118   EXPECT_EQ(5, total);
119
120   // Cancel add2 while the scheduler is running
121   EXPECT_TRUE(fs.cancelFunction("add2"));
122   EXPECT_FALSE(fs.cancelFunction("add2"));
123   EXPECT_FALSE(fs.cancelFunction("bogus"));
124
125   delay(3);
126   EXPECT_EQ(8, total);
127   EXPECT_TRUE(fs.cancelFunction("add3"));
128
129   // Test a function that cancels itself
130   int selfCancelCount = 0;
131   fs.addFunction(
132       [&] {
133         ++selfCancelCount;
134         if (selfCancelCount > 2) {
135           fs.cancelFunction("selfCancel");
136         }
137       },
138       testInterval(1), "selfCancel", testInterval(1));
139   delay(4);
140   EXPECT_EQ(3, selfCancelCount);
141   EXPECT_FALSE(fs.cancelFunction("selfCancel"));
142
143   // Test a function that schedules another function
144   int adderCount = 0;
145   int fn2Count = 0;
146   auto fn2 = [&] { ++fn2Count; };
147   auto fnAdder = [&] {
148     ++adderCount;
149     if (adderCount == 2) {
150       fs.addFunction(fn2, testInterval(3), "fn2", testInterval(2));
151     }
152   };
153   fs.addFunction(fnAdder, testInterval(4), "adder");
154   // t0: adder fires
155   delay(1); // t1
156   EXPECT_EQ(1, adderCount);
157   EXPECT_EQ(0, fn2Count);
158   // t4: adder fires, schedules fn2
159   delay(4); // t5
160   EXPECT_EQ(2, adderCount);
161   EXPECT_EQ(0, fn2Count);
162   // t6: fn2 fires
163   delay(2); // t7
164   EXPECT_EQ(2, adderCount);
165   EXPECT_EQ(1, fn2Count);
166   // t8: adder fires
167   // t9: fn2 fires
168   delay(3); // t10
169   EXPECT_EQ(3, adderCount);
170   EXPECT_EQ(2, fn2Count);
171   EXPECT_TRUE(fs.cancelFunction("fn2"));
172   EXPECT_TRUE(fs.cancelFunction("adder"));
173   delay(5); // t10
174   EXPECT_EQ(3, adderCount);
175   EXPECT_EQ(2, fn2Count);
176
177   EXPECT_EQ(8, total);
178   EXPECT_EQ(3, selfCancelCount);
179 }
180
181 TEST(FunctionScheduler, AddMultiple) {
182   int total = 0;
183   FunctionScheduler fs;
184   fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
185   fs.addFunction([&] { total += 3; }, testInterval(3), "add3");
186   EXPECT_THROW(fs.addFunction([&] { total += 2; }, testInterval(2), "add2"),
187                std::invalid_argument); // function name already exists
188
189   fs.start();
190   delay(1);
191   EXPECT_EQ(5, total);
192   delay(4);
193   EXPECT_EQ(12, total);
194   EXPECT_TRUE(fs.cancelFunction("add2"));
195   delay(2);
196   EXPECT_EQ(15, total);
197   fs.shutdown();
198   delay(3);
199   EXPECT_EQ(15, total);
200   fs.shutdown();
201 }
202
203 TEST(FunctionScheduler, AddAfterStart) {
204   int total = 0;
205   FunctionScheduler fs;
206   fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
207   fs.addFunction([&] { total += 3; }, testInterval(2), "add3");
208   fs.start();
209   delay(3);
210   EXPECT_EQ(10, total);
211   fs.addFunction([&] { total += 2; }, testInterval(3), "add22");
212   delay(2);
213   EXPECT_EQ(17, total);
214 }
215
216 TEST(FunctionScheduler, ShutdownStart) {
217   int total = 0;
218   FunctionScheduler fs;
219   fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
220   fs.start();
221   delay(1);
222   fs.shutdown();
223   fs.start();
224   delay(1);
225   EXPECT_EQ(4, total);
226   EXPECT_FALSE(fs.cancelFunction("add3")); // non existing
227   delay(2);
228   EXPECT_EQ(6, total);
229 }
230
231 TEST(FunctionScheduler, ResetFunc) {
232   int total = 0;
233   FunctionScheduler fs;
234   fs.addFunction([&] { total += 2; }, testInterval(3), "add2");
235   fs.addFunction([&] { total += 3; }, testInterval(3), "add3");
236   fs.start();
237   delay(1);
238   EXPECT_EQ(5, total);
239   EXPECT_FALSE(fs.resetFunctionTimer("NON_EXISTING"));
240   EXPECT_TRUE(fs.resetFunctionTimer("add2"));
241   delay(1);
242   // t2: after the reset, add2 should have been invoked immediately
243   EXPECT_EQ(7, total);
244   usleep(150000);
245   // t3.5: add3 should have been invoked. add2 should not
246   EXPECT_EQ(10, total);
247   delay(1);
248   // t4.5: add2 should have been invoked once more (it was reset at t1)
249   EXPECT_EQ(12, total);
250 }
251
252 TEST(FunctionScheduler, AddInvalid) {
253   int total = 0;
254   FunctionScheduler fs;
255   // interval may not be negative
256   EXPECT_THROW(fs.addFunction([&] { total += 2; }, testInterval(-1), "add2"),
257                std::invalid_argument);
258
259   EXPECT_FALSE(fs.cancelFunction("addNoFunc"));
260 }
261
262 TEST(FunctionScheduler, NoFunctions) {
263   FunctionScheduler fs;
264   EXPECT_TRUE(fs.start());
265   fs.shutdown();
266   FunctionScheduler fs2;
267   fs2.shutdown();
268 }
269
270 TEST(FunctionScheduler, AddWhileRunning) {
271   int total = 0;
272   FunctionScheduler fs;
273   fs.start();
274   delay(1);
275   fs.addFunction([&] { total += 2; }, testInterval(2), "add2");
276   // The function should be invoked nearly immediately when we add it
277   // and the FunctionScheduler is already running
278   usleep(50000);
279   EXPECT_EQ(2, total);
280   delay(2);
281   EXPECT_EQ(4, total);
282 }
283
284 TEST(FunctionScheduler, NoShutdown) {
285   int total = 0;
286   {
287     FunctionScheduler fs;
288     fs.addFunction([&] { total += 2; }, testInterval(1), "add2");
289     fs.start();
290     usleep(50000);
291     EXPECT_EQ(2, total);
292   }
293   // Destroyed the FunctionScheduler without calling shutdown.
294   // Everything should have been cleaned up, and the function will no longer
295   // get called.
296   delay(2);
297   EXPECT_EQ(2, total);
298 }
299
300 TEST(FunctionScheduler, StartDelay) {
301   int total = 0;
302   FunctionScheduler fs;
303   fs.addFunction([&] { total += 2; }, testInterval(2), "add2",
304                  testInterval(2));
305   fs.addFunction([&] { total += 3; }, testInterval(3), "add3",
306                  testInterval(2));
307   EXPECT_THROW(fs.addFunction([&] { total += 2; }, testInterval(3),
308                               "addX", testInterval(-1)),
309                std::invalid_argument);
310   fs.start();
311   delay(1); // t1
312   EXPECT_EQ(0, total);
313   // t2 : add2 total=2
314   // t2 : add3 total=5
315   delay(2); // t3
316   EXPECT_EQ(5, total);
317   // t4 : add2: total=7
318   // t5 : add3: total=10
319   // t6 : add2: total=12
320   delay(4); // t7
321   EXPECT_EQ(12, total);
322   fs.cancelFunction("add2");
323   // t8 : add3: total=15
324   delay(2); // t9
325   EXPECT_EQ(15, total);
326   fs.shutdown();
327   delay(3);
328   EXPECT_EQ(15, total);
329   fs.shutdown();
330 }
331
332 TEST(FunctionScheduler, NoSteadyCatchup) {
333   std::atomic<int> ticks(0);
334   FunctionScheduler fs;
335   // fs.setSteady(false); is the default
336   fs.addFunction([&ticks] {
337                    if (++ticks == 2) {
338                      std::this_thread::sleep_for(
339                          std::chrono::milliseconds(200));
340                    }
341                  },
342                  milliseconds(5));
343   fs.start();
344   std::this_thread::sleep_for(std::chrono::milliseconds(500));
345
346   // no steady catch up means we'd tick once for 200ms, then remaining
347   // 300ms / 5 = 60 times
348   EXPECT_LE(ticks.load(), 61);
349 }
350
351 TEST(FunctionScheduler, SteadyCatchup) {
352   std::atomic<int> ticks(0);
353   FunctionScheduler fs;
354   fs.setSteady(true);
355   fs.addFunction([&ticks] {
356                    if (++ticks == 2) {
357                      std::this_thread::sleep_for(
358                          std::chrono::milliseconds(200));
359                    }
360                  },
361                  milliseconds(5));
362   fs.start();
363
364   std::this_thread::sleep_for(std::chrono::milliseconds(500));
365
366   // tick every 5ms. Despite tick == 2 is slow, later ticks should be fast
367   // enough to catch back up to schedule
368   EXPECT_NEAR(100, ticks.load(), 10);
369 }
370
371 TEST(FunctionScheduler, UniformDistribution) {
372   int total = 0;
373   const int kTicks = 2;
374   std::chrono::milliseconds minInterval =
375       testInterval(kTicks) - (timeFactor / 5);
376   std::chrono::milliseconds maxInterval =
377       testInterval(kTicks) + (timeFactor / 5);
378   FunctionScheduler fs;
379   fs.addFunctionUniformDistribution([&] { total += 2; },
380                                     minInterval,
381                                     maxInterval,
382                                     "UniformDistribution",
383                                     std::chrono::milliseconds(0));
384   fs.start();
385   delay(1);
386   EXPECT_EQ(2, total);
387   delay(kTicks);
388   EXPECT_EQ(4, total);
389   delay(kTicks);
390   EXPECT_EQ(6, total);
391   fs.shutdown();
392   delay(2);
393   EXPECT_EQ(6, total);
394 }
395
396 TEST(FunctionScheduler, ExponentialBackoff) {
397   int total = 0;
398   int expectedInterval = 0;
399   int nextInterval = 2;
400   FunctionScheduler fs;
401   fs.addFunctionGenericDistribution(
402       [&] { total += 2; },
403       [&expectedInterval, nextInterval]() mutable {
404         expectedInterval = nextInterval;
405         nextInterval *= nextInterval;
406         return testInterval(expectedInterval);
407       },
408       "ExponentialBackoff",
409       "2^n * 100ms",
410       std::chrono::milliseconds(0));
411   fs.start();
412   delay(1);
413   EXPECT_EQ(2, total);
414   delay(expectedInterval);
415   EXPECT_EQ(4, total);
416   delay(expectedInterval);
417   EXPECT_EQ(6, total);
418   fs.shutdown();
419   delay(2);
420   EXPECT_EQ(6, total);
421 }
422
423 TEST(FunctionScheduler, GammaIntervalDistribution) {
424   int total = 0;
425   int expectedInterval = 0;
426   FunctionScheduler fs;
427   std::default_random_engine generator(folly::Random::rand32());
428   // The alpha and beta arguments are selected, somewhat randomly, to be 2.0.
429   // These values do not matter much in this test, as we are not testing the
430   // std::gamma_distribution itself...
431   std::gamma_distribution<double> gamma(2.0, 2.0);
432   fs.addFunctionGenericDistribution(
433       [&] { total += 2; },
434       [&expectedInterval, generator, gamma]() mutable {
435         expectedInterval =
436             getTicksWithinRange(static_cast<int>(gamma(generator)), 2, 10);
437         return testInterval(expectedInterval);
438       },
439       "GammaDistribution",
440       "gamma(2.0,2.0)*100ms",
441       std::chrono::milliseconds(0));
442   fs.start();
443   delay(1);
444   EXPECT_EQ(2, total);
445   delay(expectedInterval);
446   EXPECT_EQ(4, total);
447   delay(expectedInterval);
448   EXPECT_EQ(6, total);
449   fs.shutdown();
450   delay(2);
451   EXPECT_EQ(6, total);
452 }
453
454 TEST(FunctionScheduler, AddWithRunOnce) {
455   int total = 0;
456   FunctionScheduler fs;
457   fs.addFunctionOnce([&] { total += 2; }, "add2");
458   fs.start();
459   delay(1);
460   EXPECT_EQ(2, total);
461   delay(2);
462   EXPECT_EQ(2, total);
463
464   fs.addFunctionOnce([&] { total += 2; }, "add2");
465   delay(1);
466   EXPECT_EQ(4, total);
467   delay(2);
468   EXPECT_EQ(4, total);
469
470   fs.shutdown();
471 }
472
473 TEST(FunctionScheduler, cancelFunctionAndWait) {
474   int total = 0;
475   FunctionScheduler fs;
476   fs.addFunction(
477       [&] {
478         delay(5);
479         total += 2;
480       },
481       testInterval(100),
482       "add2");
483
484   fs.start();
485   delay(1);
486   EXPECT_EQ(0, total); // add2 is still sleeping
487
488   EXPECT_TRUE(fs.cancelFunctionAndWait("add2"));
489   EXPECT_EQ(2, total); // add2 should have completed
490
491   EXPECT_FALSE(fs.cancelFunction("add2")); // add2 has been canceled
492   fs.shutdown();
493 }
494
495 #if defined(__linux__)
496 namespace {
497 /**
498  * A helper class that forces our pthread_create() wrapper to fail when
499  * an PThreadCreateFailure object exists.
500  */
501 class PThreadCreateFailure {
502  public:
503   PThreadCreateFailure() {
504     ++forceFailure_;
505   }
506   ~PThreadCreateFailure() {
507     --forceFailure_;
508   }
509
510   static bool shouldFail() {
511     return forceFailure_ > 0;
512   }
513
514  private:
515   static std::atomic<int> forceFailure_;
516 };
517
518 std::atomic<int> PThreadCreateFailure::forceFailure_{0};
519 } // namespace
520
521 // Replace the system pthread_create() function with our own stub, so we can
522 // trigger failures in the StartThrows() test.
523 extern "C" int pthread_create(
524     pthread_t* thread,
525     const pthread_attr_t* attr,
526     void* (*start_routine)(void*),
527     void* arg) {
528   static const auto realFunction = reinterpret_cast<decltype(&pthread_create)>(
529       dlsym(RTLD_NEXT, "pthread_create"));
530   // For sanity, make sure we didn't find ourself,
531   // since that would cause infinite recursion.
532   CHECK_NE(realFunction, pthread_create);
533
534   if (PThreadCreateFailure::shouldFail()) {
535     errno = EINVAL;
536     return -1;
537   }
538   return realFunction(thread, attr, start_routine, arg);
539 }
540
541 TEST(FunctionScheduler, StartThrows) {
542   FunctionScheduler fs;
543   PThreadCreateFailure fail;
544   EXPECT_ANY_THROW(fs.start());
545   EXPECT_NO_THROW(fs.shutdown());
546 }
547 #endif
548
549 TEST(FunctionScheduler, cancelAllFunctionsAndWait) {
550   int total = 0;
551   FunctionScheduler fs;
552
553   fs.addFunction(
554       [&] {
555         delay(5);
556         total += 2;
557       },
558       testInterval(100),
559       "add2");
560
561   fs.start();
562   delay(1);
563   EXPECT_EQ(0, total); // add2 is still sleeping
564
565   fs.cancelAllFunctionsAndWait();
566   EXPECT_EQ(2, total);
567
568   EXPECT_FALSE(fs.cancelFunction("add2")); // add2 has been canceled
569   fs.shutdown();
570 }
571
572 TEST(FunctionScheduler, CancelAndWaitOnRunningFunc) {
573   folly::Baton<> baton;
574   std::thread th([&baton]() {
575     FunctionScheduler fs;
576     fs.addFunction([] { delay(10); }, testInterval(2), "func");
577     fs.start();
578     delay(1);
579     EXPECT_TRUE(fs.cancelFunctionAndWait("func"));
580     baton.post();
581   });
582
583   ASSERT_TRUE(baton.timed_wait(testInterval(15)));
584   th.join();
585 }
586
587 TEST(FunctionScheduler, CancelAllAndWaitWithRunningFunc) {
588   folly::Baton<> baton;
589   std::thread th([&baton]() {
590     FunctionScheduler fs;
591     fs.addFunction([] { delay(10); }, testInterval(2), "func");
592     fs.start();
593     delay(1);
594     fs.cancelAllFunctionsAndWait();
595     baton.post();
596   });
597
598   ASSERT_TRUE(baton.timed_wait(testInterval(15)));
599   th.join();
600 }
601
602 TEST(FunctionScheduler, CancelAllAndWaitWithOneRunningAndOneWaiting) {
603   folly::Baton<> baton;
604   std::thread th([&baton]() {
605     std::atomic<int> nExecuted(0);
606     FunctionScheduler fs;
607     fs.addFunction(
608         [&nExecuted] {
609           nExecuted++;
610           delay(10);
611         },
612         testInterval(2),
613         "func0");
614     fs.addFunction(
615         [&nExecuted] {
616           nExecuted++;
617           delay(10);
618         },
619         testInterval(2),
620         "func1",
621         testInterval(5));
622     fs.start();
623     delay(1);
624     fs.cancelAllFunctionsAndWait();
625     EXPECT_EQ(nExecuted, 1);
626     baton.post();
627   });
628
629   ASSERT_TRUE(baton.timed_wait(testInterval(15)));
630   th.join();
631 }
632
633 TEST(FunctionScheduler, ConcurrentCancelFunctionAndWait) {
634   FunctionScheduler fs;
635   fs.addFunction([] { delay(10); }, testInterval(2), "func");
636
637   fs.start();
638   delay(1);
639   std::thread th1([&fs] { EXPECT_TRUE(fs.cancelFunctionAndWait("func")); });
640   delay(1);
641   std::thread th2([&fs] { EXPECT_FALSE(fs.cancelFunctionAndWait("func")); });
642   th1.join();
643   th2.join();
644 }