Eliminate more VLAs
[folly.git] / folly / fibers / test / FibersTest.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <atomic>
17 #include <thread>
18 #include <vector>
19
20 #include <folly/Memory.h>
21 #include <folly/Random.h>
22 #include <folly/futures/Future.h>
23
24 #include <folly/Conv.h>
25 #include <folly/fibers/AddTasks.h>
26 #include <folly/fibers/AtomicBatchDispatcher.h>
27 #include <folly/fibers/BatchDispatcher.h>
28 #include <folly/fibers/EventBaseLoopController.h>
29 #include <folly/fibers/FiberManager.h>
30 #include <folly/fibers/FiberManagerMap.h>
31 #include <folly/fibers/GenericBaton.h>
32 #include <folly/fibers/Semaphore.h>
33 #include <folly/fibers/SimpleLoopController.h>
34 #include <folly/fibers/WhenN.h>
35 #include <folly/portability/GTest.h>
36
37 using namespace folly::fibers;
38
39 using folly::Try;
40
41 TEST(FiberManager, batonTimedWaitTimeout) {
42   bool taskAdded = false;
43   size_t iterations = 0;
44
45   FiberManager manager(folly::make_unique<SimpleLoopController>());
46   auto& loopController =
47       dynamic_cast<SimpleLoopController&>(manager.loopController());
48
49   auto loopFunc = [&]() {
50     if (!taskAdded) {
51       manager.addTask([&]() {
52         Baton baton;
53
54         auto res = baton.timed_wait(std::chrono::milliseconds(230));
55
56         EXPECT_FALSE(res);
57         EXPECT_EQ(5, iterations);
58
59         loopController.stop();
60       });
61       manager.addTask([&]() {
62         Baton baton;
63
64         auto res = baton.timed_wait(std::chrono::milliseconds(130));
65
66         EXPECT_FALSE(res);
67         EXPECT_EQ(3, iterations);
68
69         loopController.stop();
70       });
71       taskAdded = true;
72     } else {
73       std::this_thread::sleep_for(std::chrono::milliseconds(50));
74       iterations++;
75     }
76   };
77
78   loopController.loop(std::move(loopFunc));
79 }
80
81 TEST(FiberManager, batonTimedWaitPost) {
82   bool taskAdded = false;
83   size_t iterations = 0;
84   Baton* baton_ptr;
85
86   FiberManager manager(folly::make_unique<SimpleLoopController>());
87   auto& loopController =
88       dynamic_cast<SimpleLoopController&>(manager.loopController());
89
90   auto loopFunc = [&]() {
91     if (!taskAdded) {
92       manager.addTask([&]() {
93         Baton baton;
94         baton_ptr = &baton;
95
96         auto res = baton.timed_wait(std::chrono::milliseconds(130));
97
98         EXPECT_TRUE(res);
99         EXPECT_EQ(2, iterations);
100
101         loopController.stop();
102       });
103       taskAdded = true;
104     } else {
105       std::this_thread::sleep_for(std::chrono::milliseconds(50));
106       iterations++;
107       if (iterations == 2) {
108         baton_ptr->post();
109       }
110     }
111   };
112
113   loopController.loop(std::move(loopFunc));
114 }
115
116 TEST(FiberManager, batonTimedWaitTimeoutEvb) {
117   size_t tasksComplete = 0;
118
119   folly::EventBase evb;
120
121   FiberManager manager(folly::make_unique<EventBaseLoopController>());
122   dynamic_cast<EventBaseLoopController&>(manager.loopController())
123       .attachEventBase(evb);
124
125   auto task = [&](size_t timeout_ms) {
126     Baton baton;
127
128     auto start = EventBaseLoopController::Clock::now();
129     auto res = baton.timed_wait(std::chrono::milliseconds(timeout_ms));
130     auto finish = EventBaseLoopController::Clock::now();
131
132     EXPECT_FALSE(res);
133
134     auto duration_ms =
135         std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
136
137     EXPECT_GT(duration_ms.count(), timeout_ms - 50);
138     EXPECT_LT(duration_ms.count(), timeout_ms + 50);
139
140     if (++tasksComplete == 2) {
141       evb.terminateLoopSoon();
142     }
143   };
144
145   evb.runInEventBaseThread([&]() {
146     manager.addTask([&]() { task(500); });
147     manager.addTask([&]() { task(250); });
148   });
149
150   evb.loopForever();
151
152   EXPECT_EQ(2, tasksComplete);
153 }
154
155 TEST(FiberManager, batonTimedWaitPostEvb) {
156   size_t tasksComplete = 0;
157
158   folly::EventBase evb;
159
160   FiberManager manager(folly::make_unique<EventBaseLoopController>());
161   dynamic_cast<EventBaseLoopController&>(manager.loopController())
162       .attachEventBase(evb);
163
164   evb.runInEventBaseThread([&]() {
165     manager.addTask([&]() {
166       Baton baton;
167
168       evb.tryRunAfterDelay([&]() { baton.post(); }, 100);
169
170       auto start = EventBaseLoopController::Clock::now();
171       auto res = baton.timed_wait(std::chrono::milliseconds(130));
172       auto finish = EventBaseLoopController::Clock::now();
173
174       EXPECT_TRUE(res);
175
176       auto duration_ms =
177           std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
178
179       EXPECT_TRUE(duration_ms.count() > 95 && duration_ms.count() < 110);
180
181       if (++tasksComplete == 1) {
182         evb.terminateLoopSoon();
183       }
184     });
185   });
186
187   evb.loopForever();
188
189   EXPECT_EQ(1, tasksComplete);
190 }
191
192 TEST(FiberManager, batonTryWait) {
193   FiberManager manager(folly::make_unique<SimpleLoopController>());
194
195   // Check if try_wait and post work as expected
196   Baton b;
197
198   manager.addTask([&]() {
199     while (!b.try_wait()) {
200     }
201   });
202   auto thr = std::thread([&]() {
203     std::this_thread::sleep_for(std::chrono::milliseconds(300));
204     b.post();
205   });
206
207   manager.loopUntilNoReady();
208   thr.join();
209
210   Baton c;
211
212   // Check try_wait without post
213   manager.addTask([&]() {
214     int cnt = 100;
215     while (cnt && !c.try_wait()) {
216       cnt--;
217     }
218     EXPECT_TRUE(!c.try_wait()); // must still hold
219     EXPECT_EQ(cnt, 0);
220   });
221
222   manager.loopUntilNoReady();
223 }
224
225 TEST(FiberManager, genericBatonFiberWait) {
226   FiberManager manager(folly::make_unique<SimpleLoopController>());
227
228   GenericBaton b;
229   bool fiberRunning = false;
230
231   manager.addTask([&]() {
232     EXPECT_EQ(manager.hasActiveFiber(), true);
233     fiberRunning = true;
234     b.wait();
235     fiberRunning = false;
236   });
237
238   EXPECT_FALSE(fiberRunning);
239   manager.loopUntilNoReady();
240   EXPECT_TRUE(fiberRunning); // ensure fiber still active
241
242   auto thr = std::thread([&]() {
243     std::this_thread::sleep_for(std::chrono::milliseconds(300));
244     b.post();
245   });
246
247   while (fiberRunning) {
248     manager.loopUntilNoReady();
249   }
250
251   thr.join();
252 }
253
254 TEST(FiberManager, genericBatonThreadWait) {
255   FiberManager manager(folly::make_unique<SimpleLoopController>());
256   GenericBaton b;
257   std::atomic<bool> threadWaiting(false);
258
259   auto thr = std::thread([&]() {
260     threadWaiting = true;
261     b.wait();
262     threadWaiting = false;
263   });
264
265   while (!threadWaiting) {
266   }
267   std::this_thread::sleep_for(std::chrono::milliseconds(300));
268
269   manager.addTask([&]() {
270     EXPECT_EQ(manager.hasActiveFiber(), true);
271     EXPECT_TRUE(threadWaiting);
272     b.post();
273     while (threadWaiting) {
274     }
275   });
276
277   manager.loopUntilNoReady();
278   thr.join();
279 }
280
281 TEST(FiberManager, addTasksNoncopyable) {
282   std::vector<Promise<int>> pendingFibers;
283   bool taskAdded = false;
284
285   FiberManager manager(folly::make_unique<SimpleLoopController>());
286   auto& loopController =
287       dynamic_cast<SimpleLoopController&>(manager.loopController());
288
289   auto loopFunc = [&]() {
290     if (!taskAdded) {
291       manager.addTask([&]() {
292         std::vector<std::function<std::unique_ptr<int>()>> funcs;
293         for (size_t i = 0; i < 3; ++i) {
294           funcs.push_back([i, &pendingFibers]() {
295             await([&pendingFibers](Promise<int> promise) {
296               pendingFibers.push_back(std::move(promise));
297             });
298             return folly::make_unique<int>(i * 2 + 1);
299           });
300         }
301
302         auto iter = addTasks(funcs.begin(), funcs.end());
303
304         size_t n = 0;
305         while (iter.hasNext()) {
306           auto result = iter.awaitNext();
307           EXPECT_EQ(2 * iter.getTaskID() + 1, *result);
308           EXPECT_GE(2 - n, pendingFibers.size());
309           ++n;
310         }
311         EXPECT_EQ(3, n);
312       });
313       taskAdded = true;
314     } else if (pendingFibers.size()) {
315       pendingFibers.back().setValue(0);
316       pendingFibers.pop_back();
317     } else {
318       loopController.stop();
319     }
320   };
321
322   loopController.loop(std::move(loopFunc));
323 }
324
325 TEST(FiberManager, awaitThrow) {
326   folly::EventBase evb;
327   struct ExpectedException {};
328   getFiberManager(evb)
329       .addTaskFuture([&] {
330         EXPECT_THROW(
331           await([](Promise<int> p) {
332               p.setValue(42);
333               throw ExpectedException();
334             }),
335           ExpectedException
336         );
337
338         EXPECT_THROW(
339           await([&](Promise<int> p) {
340               evb.runInEventBaseThread([p = std::move(p)]() mutable {
341                   p.setValue(42);
342                 });
343               throw ExpectedException();
344             }),
345           ExpectedException);
346       })
347       .waitVia(&evb);
348 }
349
350 TEST(FiberManager, addTasksThrow) {
351   std::vector<Promise<int>> pendingFibers;
352   bool taskAdded = false;
353
354   FiberManager manager(folly::make_unique<SimpleLoopController>());
355   auto& loopController =
356       dynamic_cast<SimpleLoopController&>(manager.loopController());
357
358   auto loopFunc = [&]() {
359     if (!taskAdded) {
360       manager.addTask([&]() {
361         std::vector<std::function<int()>> funcs;
362         for (size_t i = 0; i < 3; ++i) {
363           funcs.push_back([i, &pendingFibers]() {
364             await([&pendingFibers](Promise<int> promise) {
365               pendingFibers.push_back(std::move(promise));
366             });
367             if (i % 2 == 0) {
368               throw std::runtime_error("Runtime");
369             }
370             return i * 2 + 1;
371           });
372         }
373
374         auto iter = addTasks(funcs.begin(), funcs.end());
375
376         size_t n = 0;
377         while (iter.hasNext()) {
378           try {
379             int result = iter.awaitNext();
380             EXPECT_EQ(1, iter.getTaskID() % 2);
381             EXPECT_EQ(2 * iter.getTaskID() + 1, result);
382           } catch (...) {
383             EXPECT_EQ(0, iter.getTaskID() % 2);
384           }
385           EXPECT_GE(2 - n, pendingFibers.size());
386           ++n;
387         }
388         EXPECT_EQ(3, n);
389       });
390       taskAdded = true;
391     } else if (pendingFibers.size()) {
392       pendingFibers.back().setValue(0);
393       pendingFibers.pop_back();
394     } else {
395       loopController.stop();
396     }
397   };
398
399   loopController.loop(std::move(loopFunc));
400 }
401
402 TEST(FiberManager, addTasksVoid) {
403   std::vector<Promise<int>> pendingFibers;
404   bool taskAdded = false;
405
406   FiberManager manager(folly::make_unique<SimpleLoopController>());
407   auto& loopController =
408       dynamic_cast<SimpleLoopController&>(manager.loopController());
409
410   auto loopFunc = [&]() {
411     if (!taskAdded) {
412       manager.addTask([&]() {
413         std::vector<std::function<void()>> funcs;
414         for (size_t i = 0; i < 3; ++i) {
415           funcs.push_back([i, &pendingFibers]() {
416             await([&pendingFibers](Promise<int> promise) {
417               pendingFibers.push_back(std::move(promise));
418             });
419           });
420         }
421
422         auto iter = addTasks(funcs.begin(), funcs.end());
423
424         size_t n = 0;
425         while (iter.hasNext()) {
426           iter.awaitNext();
427           EXPECT_GE(2 - n, pendingFibers.size());
428           ++n;
429         }
430         EXPECT_EQ(3, n);
431       });
432       taskAdded = true;
433     } else if (pendingFibers.size()) {
434       pendingFibers.back().setValue(0);
435       pendingFibers.pop_back();
436     } else {
437       loopController.stop();
438     }
439   };
440
441   loopController.loop(std::move(loopFunc));
442 }
443
444 TEST(FiberManager, addTasksVoidThrow) {
445   std::vector<Promise<int>> pendingFibers;
446   bool taskAdded = false;
447
448   FiberManager manager(folly::make_unique<SimpleLoopController>());
449   auto& loopController =
450       dynamic_cast<SimpleLoopController&>(manager.loopController());
451
452   auto loopFunc = [&]() {
453     if (!taskAdded) {
454       manager.addTask([&]() {
455         std::vector<std::function<void()>> funcs;
456         for (size_t i = 0; i < 3; ++i) {
457           funcs.push_back([i, &pendingFibers]() {
458             await([&pendingFibers](Promise<int> promise) {
459               pendingFibers.push_back(std::move(promise));
460             });
461             if (i % 2 == 0) {
462               throw std::runtime_error("");
463             }
464           });
465         }
466
467         auto iter = addTasks(funcs.begin(), funcs.end());
468
469         size_t n = 0;
470         while (iter.hasNext()) {
471           try {
472             iter.awaitNext();
473             EXPECT_EQ(1, iter.getTaskID() % 2);
474           } catch (...) {
475             EXPECT_EQ(0, iter.getTaskID() % 2);
476           }
477           EXPECT_GE(2 - n, pendingFibers.size());
478           ++n;
479         }
480         EXPECT_EQ(3, n);
481       });
482       taskAdded = true;
483     } else if (pendingFibers.size()) {
484       pendingFibers.back().setValue(0);
485       pendingFibers.pop_back();
486     } else {
487       loopController.stop();
488     }
489   };
490
491   loopController.loop(std::move(loopFunc));
492 }
493
494 TEST(FiberManager, addTasksReserve) {
495   std::vector<Promise<int>> pendingFibers;
496   bool taskAdded = false;
497
498   FiberManager manager(folly::make_unique<SimpleLoopController>());
499   auto& loopController =
500       dynamic_cast<SimpleLoopController&>(manager.loopController());
501
502   auto loopFunc = [&]() {
503     if (!taskAdded) {
504       manager.addTask([&]() {
505         std::vector<std::function<void()>> funcs;
506         for (size_t i = 0; i < 3; ++i) {
507           funcs.push_back([&pendingFibers]() {
508             await([&pendingFibers](Promise<int> promise) {
509               pendingFibers.push_back(std::move(promise));
510             });
511           });
512         }
513
514         auto iter = addTasks(funcs.begin(), funcs.end());
515
516         iter.reserve(2);
517         EXPECT_TRUE(iter.hasCompleted());
518         EXPECT_TRUE(iter.hasPending());
519         EXPECT_TRUE(iter.hasNext());
520
521         iter.awaitNext();
522         EXPECT_TRUE(iter.hasCompleted());
523         EXPECT_TRUE(iter.hasPending());
524         EXPECT_TRUE(iter.hasNext());
525
526         iter.awaitNext();
527         EXPECT_FALSE(iter.hasCompleted());
528         EXPECT_TRUE(iter.hasPending());
529         EXPECT_TRUE(iter.hasNext());
530
531         iter.awaitNext();
532         EXPECT_FALSE(iter.hasCompleted());
533         EXPECT_FALSE(iter.hasPending());
534         EXPECT_FALSE(iter.hasNext());
535       });
536       taskAdded = true;
537     } else if (pendingFibers.size()) {
538       pendingFibers.back().setValue(0);
539       pendingFibers.pop_back();
540     } else {
541       loopController.stop();
542     }
543   };
544
545   loopController.loop(std::move(loopFunc));
546 }
547
548 TEST(FiberManager, addTaskDynamic) {
549   folly::EventBase evb;
550
551   Baton batons[3];
552
553   auto makeTask = [&](size_t taskId) {
554     return [&, taskId]() -> size_t {
555       batons[taskId].wait();
556       return taskId;
557     };
558   };
559
560   getFiberManager(evb)
561       .addTaskFuture([&]() {
562         TaskIterator<size_t> iterator;
563
564         iterator.addTask(makeTask(0));
565         iterator.addTask(makeTask(1));
566
567         batons[1].post();
568
569         EXPECT_EQ(1, iterator.awaitNext());
570
571         iterator.addTask(makeTask(2));
572
573         batons[2].post();
574
575         EXPECT_EQ(2, iterator.awaitNext());
576
577         batons[0].post();
578
579         EXPECT_EQ(0, iterator.awaitNext());
580       })
581       .waitVia(&evb);
582 }
583
584 TEST(FiberManager, forEach) {
585   std::vector<Promise<int>> pendingFibers;
586   bool taskAdded = false;
587
588   FiberManager manager(folly::make_unique<SimpleLoopController>());
589   auto& loopController =
590       dynamic_cast<SimpleLoopController&>(manager.loopController());
591
592   auto loopFunc = [&]() {
593     if (!taskAdded) {
594       manager.addTask([&]() {
595         std::vector<std::function<int()>> funcs;
596         for (size_t i = 0; i < 3; ++i) {
597           funcs.push_back([i, &pendingFibers]() {
598             await([&pendingFibers](Promise<int> promise) {
599               pendingFibers.push_back(std::move(promise));
600             });
601             return i * 2 + 1;
602           });
603         }
604
605         std::vector<std::pair<size_t, int>> results;
606         forEach(funcs.begin(), funcs.end(), [&results](size_t id, int result) {
607           results.emplace_back(id, result);
608         });
609         EXPECT_EQ(3, results.size());
610         EXPECT_TRUE(pendingFibers.empty());
611         for (size_t i = 0; i < 3; ++i) {
612           EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
613         }
614       });
615       taskAdded = true;
616     } else if (pendingFibers.size()) {
617       pendingFibers.back().setValue(0);
618       pendingFibers.pop_back();
619     } else {
620       loopController.stop();
621     }
622   };
623
624   loopController.loop(std::move(loopFunc));
625 }
626
627 TEST(FiberManager, collectN) {
628   std::vector<Promise<int>> pendingFibers;
629   bool taskAdded = false;
630
631   FiberManager manager(folly::make_unique<SimpleLoopController>());
632   auto& loopController =
633       dynamic_cast<SimpleLoopController&>(manager.loopController());
634
635   auto loopFunc = [&]() {
636     if (!taskAdded) {
637       manager.addTask([&]() {
638         std::vector<std::function<int()>> funcs;
639         for (size_t i = 0; i < 3; ++i) {
640           funcs.push_back([i, &pendingFibers]() {
641             await([&pendingFibers](Promise<int> promise) {
642               pendingFibers.push_back(std::move(promise));
643             });
644             return i * 2 + 1;
645           });
646         }
647
648         auto results = collectN(funcs.begin(), funcs.end(), 2);
649         EXPECT_EQ(2, results.size());
650         EXPECT_EQ(1, pendingFibers.size());
651         for (size_t i = 0; i < 2; ++i) {
652           EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
653         }
654       });
655       taskAdded = true;
656     } else if (pendingFibers.size()) {
657       pendingFibers.back().setValue(0);
658       pendingFibers.pop_back();
659     } else {
660       loopController.stop();
661     }
662   };
663
664   loopController.loop(std::move(loopFunc));
665 }
666
667 TEST(FiberManager, collectNThrow) {
668   std::vector<Promise<int>> pendingFibers;
669   bool taskAdded = false;
670
671   FiberManager manager(folly::make_unique<SimpleLoopController>());
672   auto& loopController =
673       dynamic_cast<SimpleLoopController&>(manager.loopController());
674
675   auto loopFunc = [&]() {
676     if (!taskAdded) {
677       manager.addTask([&]() {
678         std::vector<std::function<int()>> funcs;
679         for (size_t i = 0; i < 3; ++i) {
680           funcs.push_back([i, &pendingFibers]() {
681             await([&pendingFibers](Promise<int> promise) {
682               pendingFibers.push_back(std::move(promise));
683             });
684             throw std::runtime_error("Runtime");
685             return i * 2 + 1;
686           });
687         }
688
689         try {
690           collectN(funcs.begin(), funcs.end(), 2);
691         } catch (...) {
692           EXPECT_EQ(1, pendingFibers.size());
693         }
694       });
695       taskAdded = true;
696     } else if (pendingFibers.size()) {
697       pendingFibers.back().setValue(0);
698       pendingFibers.pop_back();
699     } else {
700       loopController.stop();
701     }
702   };
703
704   loopController.loop(std::move(loopFunc));
705 }
706
707 TEST(FiberManager, collectNVoid) {
708   std::vector<Promise<int>> pendingFibers;
709   bool taskAdded = false;
710
711   FiberManager manager(folly::make_unique<SimpleLoopController>());
712   auto& loopController =
713       dynamic_cast<SimpleLoopController&>(manager.loopController());
714
715   auto loopFunc = [&]() {
716     if (!taskAdded) {
717       manager.addTask([&]() {
718         std::vector<std::function<void()>> funcs;
719         for (size_t i = 0; i < 3; ++i) {
720           funcs.push_back([i, &pendingFibers]() {
721             await([&pendingFibers](Promise<int> promise) {
722               pendingFibers.push_back(std::move(promise));
723             });
724           });
725         }
726
727         auto results = collectN(funcs.begin(), funcs.end(), 2);
728         EXPECT_EQ(2, results.size());
729         EXPECT_EQ(1, pendingFibers.size());
730       });
731       taskAdded = true;
732     } else if (pendingFibers.size()) {
733       pendingFibers.back().setValue(0);
734       pendingFibers.pop_back();
735     } else {
736       loopController.stop();
737     }
738   };
739
740   loopController.loop(std::move(loopFunc));
741 }
742
743 TEST(FiberManager, collectNVoidThrow) {
744   std::vector<Promise<int>> pendingFibers;
745   bool taskAdded = false;
746
747   FiberManager manager(folly::make_unique<SimpleLoopController>());
748   auto& loopController =
749       dynamic_cast<SimpleLoopController&>(manager.loopController());
750
751   auto loopFunc = [&]() {
752     if (!taskAdded) {
753       manager.addTask([&]() {
754         std::vector<std::function<void()>> funcs;
755         for (size_t i = 0; i < 3; ++i) {
756           funcs.push_back([i, &pendingFibers]() {
757             await([&pendingFibers](Promise<int> promise) {
758               pendingFibers.push_back(std::move(promise));
759             });
760             throw std::runtime_error("Runtime");
761           });
762         }
763
764         try {
765           collectN(funcs.begin(), funcs.end(), 2);
766         } catch (...) {
767           EXPECT_EQ(1, pendingFibers.size());
768         }
769       });
770       taskAdded = true;
771     } else if (pendingFibers.size()) {
772       pendingFibers.back().setValue(0);
773       pendingFibers.pop_back();
774     } else {
775       loopController.stop();
776     }
777   };
778
779   loopController.loop(std::move(loopFunc));
780 }
781
782 TEST(FiberManager, collectAll) {
783   std::vector<Promise<int>> pendingFibers;
784   bool taskAdded = false;
785
786   FiberManager manager(folly::make_unique<SimpleLoopController>());
787   auto& loopController =
788       dynamic_cast<SimpleLoopController&>(manager.loopController());
789
790   auto loopFunc = [&]() {
791     if (!taskAdded) {
792       manager.addTask([&]() {
793         std::vector<std::function<int()>> funcs;
794         for (size_t i = 0; i < 3; ++i) {
795           funcs.push_back([i, &pendingFibers]() {
796             await([&pendingFibers](Promise<int> promise) {
797               pendingFibers.push_back(std::move(promise));
798             });
799             return i * 2 + 1;
800           });
801         }
802
803         auto results = collectAll(funcs.begin(), funcs.end());
804         EXPECT_TRUE(pendingFibers.empty());
805         for (size_t i = 0; i < 3; ++i) {
806           EXPECT_EQ(i * 2 + 1, results[i]);
807         }
808       });
809       taskAdded = true;
810     } else if (pendingFibers.size()) {
811       pendingFibers.back().setValue(0);
812       pendingFibers.pop_back();
813     } else {
814       loopController.stop();
815     }
816   };
817
818   loopController.loop(std::move(loopFunc));
819 }
820
821 TEST(FiberManager, collectAllVoid) {
822   std::vector<Promise<int>> pendingFibers;
823   bool taskAdded = false;
824
825   FiberManager manager(folly::make_unique<SimpleLoopController>());
826   auto& loopController =
827       dynamic_cast<SimpleLoopController&>(manager.loopController());
828
829   auto loopFunc = [&]() {
830     if (!taskAdded) {
831       manager.addTask([&]() {
832         std::vector<std::function<void()>> funcs;
833         for (size_t i = 0; i < 3; ++i) {
834           funcs.push_back([i, &pendingFibers]() {
835             await([&pendingFibers](Promise<int> promise) {
836               pendingFibers.push_back(std::move(promise));
837             });
838           });
839         }
840
841         collectAll(funcs.begin(), funcs.end());
842         EXPECT_TRUE(pendingFibers.empty());
843       });
844       taskAdded = true;
845     } else if (pendingFibers.size()) {
846       pendingFibers.back().setValue(0);
847       pendingFibers.pop_back();
848     } else {
849       loopController.stop();
850     }
851   };
852
853   loopController.loop(std::move(loopFunc));
854 }
855
856 TEST(FiberManager, collectAny) {
857   std::vector<Promise<int>> pendingFibers;
858   bool taskAdded = false;
859
860   FiberManager manager(folly::make_unique<SimpleLoopController>());
861   auto& loopController =
862       dynamic_cast<SimpleLoopController&>(manager.loopController());
863
864   auto loopFunc = [&]() {
865     if (!taskAdded) {
866       manager.addTask([&]() {
867         std::vector<std::function<int()>> funcs;
868         for (size_t i = 0; i < 3; ++i) {
869           funcs.push_back([i, &pendingFibers]() {
870             await([&pendingFibers](Promise<int> promise) {
871               pendingFibers.push_back(std::move(promise));
872             });
873             if (i == 1) {
874               throw std::runtime_error("This exception will be ignored");
875             }
876             return i * 2 + 1;
877           });
878         }
879
880         auto result = collectAny(funcs.begin(), funcs.end());
881         EXPECT_EQ(2, pendingFibers.size());
882         EXPECT_EQ(2, result.first);
883         EXPECT_EQ(2 * 2 + 1, result.second);
884       });
885       taskAdded = true;
886     } else if (pendingFibers.size()) {
887       pendingFibers.back().setValue(0);
888       pendingFibers.pop_back();
889     } else {
890       loopController.stop();
891     }
892   };
893
894   loopController.loop(std::move(loopFunc));
895 }
896
897 namespace {
898 /* Checks that this function was run from a main context,
899    by comparing an address on a stack to a known main stack address
900    and a known related fiber stack address.  The assumption
901    is that fiber stack and main stack will be far enough apart,
902    while any two values on the same stack will be close. */
903 void expectMainContext(bool& ran, int* mainLocation, int* fiberLocation) {
904   int here;
905   /* 2 pages is a good guess */
906   constexpr ssize_t DISTANCE = 0x2000 / sizeof(int);
907   if (fiberLocation) {
908     EXPECT_TRUE(std::abs(&here - fiberLocation) > DISTANCE);
909   }
910   if (mainLocation) {
911     EXPECT_TRUE(std::abs(&here - mainLocation) < DISTANCE);
912   }
913
914   EXPECT_FALSE(ran);
915   ran = true;
916 }
917 }
918
919 TEST(FiberManager, runInMainContext) {
920   FiberManager manager(folly::make_unique<SimpleLoopController>());
921   auto& loopController =
922       dynamic_cast<SimpleLoopController&>(manager.loopController());
923
924   bool checkRan = false;
925
926   int mainLocation;
927   manager.runInMainContext(
928       [&]() { expectMainContext(checkRan, &mainLocation, nullptr); });
929   EXPECT_TRUE(checkRan);
930
931   checkRan = false;
932
933   manager.addTask([&]() {
934     struct A {
935       explicit A(int value_) : value(value_) {}
936       A(const A&) = delete;
937       A(A&&) = default;
938
939       int value;
940     };
941     int stackLocation;
942     auto ret = runInMainContext([&]() {
943       expectMainContext(checkRan, &mainLocation, &stackLocation);
944       return A(42);
945     });
946     EXPECT_TRUE(checkRan);
947     EXPECT_EQ(42, ret.value);
948   });
949
950   loopController.loop([&]() { loopController.stop(); });
951
952   EXPECT_TRUE(checkRan);
953 }
954
955 TEST(FiberManager, addTaskFinally) {
956   FiberManager manager(folly::make_unique<SimpleLoopController>());
957   auto& loopController =
958       dynamic_cast<SimpleLoopController&>(manager.loopController());
959
960   bool checkRan = false;
961
962   int mainLocation;
963
964   manager.addTaskFinally(
965       [&]() { return 1234; },
966       [&](Try<int>&& result) {
967         EXPECT_EQ(result.value(), 1234);
968
969         expectMainContext(checkRan, &mainLocation, nullptr);
970       });
971
972   EXPECT_FALSE(checkRan);
973
974   loopController.loop([&]() { loopController.stop(); });
975
976   EXPECT_TRUE(checkRan);
977 }
978
979 TEST(FiberManager, fibersPoolWithinLimit) {
980   FiberManager::Options opts;
981   opts.maxFibersPoolSize = 5;
982
983   FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
984   auto& loopController =
985       dynamic_cast<SimpleLoopController&>(manager.loopController());
986
987   size_t fibersRun = 0;
988
989   for (size_t i = 0; i < 5; ++i) {
990     manager.addTask([&]() { ++fibersRun; });
991   }
992   loopController.loop([&]() { loopController.stop(); });
993
994   EXPECT_EQ(5, fibersRun);
995   EXPECT_EQ(5, manager.fibersAllocated());
996   EXPECT_EQ(5, manager.fibersPoolSize());
997
998   for (size_t i = 0; i < 5; ++i) {
999     manager.addTask([&]() { ++fibersRun; });
1000   }
1001   loopController.loop([&]() { loopController.stop(); });
1002
1003   EXPECT_EQ(10, fibersRun);
1004   EXPECT_EQ(5, manager.fibersAllocated());
1005   EXPECT_EQ(5, manager.fibersPoolSize());
1006 }
1007
1008 TEST(FiberManager, fibersPoolOverLimit) {
1009   FiberManager::Options opts;
1010   opts.maxFibersPoolSize = 5;
1011
1012   FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
1013   auto& loopController =
1014       dynamic_cast<SimpleLoopController&>(manager.loopController());
1015
1016   size_t fibersRun = 0;
1017
1018   for (size_t i = 0; i < 10; ++i) {
1019     manager.addTask([&]() { ++fibersRun; });
1020   }
1021
1022   EXPECT_EQ(0, fibersRun);
1023   EXPECT_EQ(10, manager.fibersAllocated());
1024   EXPECT_EQ(0, manager.fibersPoolSize());
1025
1026   loopController.loop([&]() { loopController.stop(); });
1027
1028   EXPECT_EQ(10, fibersRun);
1029   EXPECT_EQ(5, manager.fibersAllocated());
1030   EXPECT_EQ(5, manager.fibersPoolSize());
1031 }
1032
1033 TEST(FiberManager, remoteFiberBasic) {
1034   FiberManager manager(folly::make_unique<SimpleLoopController>());
1035   auto& loopController =
1036       dynamic_cast<SimpleLoopController&>(manager.loopController());
1037
1038   int result[2];
1039   result[0] = result[1] = 0;
1040   folly::Optional<Promise<int>> savedPromise[2];
1041   manager.addTask([&]() {
1042     result[0] = await(
1043         [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1044   });
1045   manager.addTask([&]() {
1046     result[1] = await(
1047         [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1048   });
1049
1050   manager.loopUntilNoReady();
1051
1052   EXPECT_TRUE(savedPromise[0].hasValue());
1053   EXPECT_TRUE(savedPromise[1].hasValue());
1054   EXPECT_EQ(0, result[0]);
1055   EXPECT_EQ(0, result[1]);
1056
1057   std::thread remoteThread0{[&]() { savedPromise[0]->setValue(42); }};
1058   std::thread remoteThread1{[&]() { savedPromise[1]->setValue(43); }};
1059   remoteThread0.join();
1060   remoteThread1.join();
1061   EXPECT_EQ(0, result[0]);
1062   EXPECT_EQ(0, result[1]);
1063   /* Should only have scheduled once */
1064   EXPECT_EQ(1, loopController.remoteScheduleCalled());
1065
1066   manager.loopUntilNoReady();
1067   EXPECT_EQ(42, result[0]);
1068   EXPECT_EQ(43, result[1]);
1069 }
1070
1071 TEST(FiberManager, addTaskRemoteBasic) {
1072   FiberManager manager(folly::make_unique<SimpleLoopController>());
1073
1074   int result[2];
1075   result[0] = result[1] = 0;
1076   folly::Optional<Promise<int>> savedPromise[2];
1077
1078   std::thread remoteThread0{[&]() {
1079     manager.addTaskRemote([&]() {
1080       result[0] = await(
1081           [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1082     });
1083   }};
1084   std::thread remoteThread1{[&]() {
1085     manager.addTaskRemote([&]() {
1086       result[1] = await(
1087           [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1088     });
1089   }};
1090   remoteThread0.join();
1091   remoteThread1.join();
1092
1093   manager.loopUntilNoReady();
1094
1095   EXPECT_TRUE(savedPromise[0].hasValue());
1096   EXPECT_TRUE(savedPromise[1].hasValue());
1097   EXPECT_EQ(0, result[0]);
1098   EXPECT_EQ(0, result[1]);
1099
1100   savedPromise[0]->setValue(42);
1101   savedPromise[1]->setValue(43);
1102
1103   EXPECT_EQ(0, result[0]);
1104   EXPECT_EQ(0, result[1]);
1105
1106   manager.loopUntilNoReady();
1107   EXPECT_EQ(42, result[0]);
1108   EXPECT_EQ(43, result[1]);
1109 }
1110
1111 TEST(FiberManager, remoteHasTasks) {
1112   size_t counter = 0;
1113   FiberManager fm(folly::make_unique<SimpleLoopController>());
1114   std::thread remote([&]() { fm.addTaskRemote([&]() { ++counter; }); });
1115
1116   remote.join();
1117
1118   while (fm.hasTasks()) {
1119     fm.loopUntilNoReady();
1120   }
1121
1122   EXPECT_FALSE(fm.hasTasks());
1123   EXPECT_EQ(counter, 1);
1124 }
1125
1126 TEST(FiberManager, remoteHasReadyTasks) {
1127   int result = 0;
1128   folly::Optional<Promise<int>> savedPromise;
1129   FiberManager fm(folly::make_unique<SimpleLoopController>());
1130   std::thread remote([&]() {
1131     fm.addTaskRemote([&]() {
1132       result = await(
1133           [&](Promise<int> promise) { savedPromise = std::move(promise); });
1134       EXPECT_TRUE(fm.hasTasks());
1135     });
1136   });
1137
1138   remote.join();
1139   EXPECT_TRUE(fm.hasTasks());
1140
1141   fm.loopUntilNoReady();
1142   EXPECT_TRUE(fm.hasTasks());
1143
1144   std::thread remote2([&]() { savedPromise->setValue(47); });
1145   remote2.join();
1146   EXPECT_TRUE(fm.hasTasks());
1147
1148   fm.loopUntilNoReady();
1149   EXPECT_FALSE(fm.hasTasks());
1150
1151   EXPECT_EQ(result, 47);
1152 }
1153
1154 template <typename Data>
1155 void testFiberLocal() {
1156   FiberManager fm(
1157       LocalType<Data>(), folly::make_unique<SimpleLoopController>());
1158
1159   fm.addTask([]() {
1160     EXPECT_EQ(42, local<Data>().value);
1161
1162     local<Data>().value = 43;
1163
1164     addTask([]() {
1165       EXPECT_EQ(43, local<Data>().value);
1166
1167       local<Data>().value = 44;
1168
1169       addTask([]() { EXPECT_EQ(44, local<Data>().value); });
1170     });
1171   });
1172
1173   fm.addTask([&]() {
1174     EXPECT_EQ(42, local<Data>().value);
1175
1176     local<Data>().value = 43;
1177
1178     fm.addTaskRemote([]() { EXPECT_EQ(43, local<Data>().value); });
1179   });
1180
1181   fm.addTask([]() {
1182     EXPECT_EQ(42, local<Data>().value);
1183     local<Data>().value = 43;
1184
1185     auto task = []() {
1186       EXPECT_EQ(43, local<Data>().value);
1187       local<Data>().value = 44;
1188     };
1189     std::vector<std::function<void()>> tasks{task};
1190     collectAny(tasks.begin(), tasks.end());
1191
1192     EXPECT_EQ(43, local<Data>().value);
1193   });
1194
1195   fm.loopUntilNoReady();
1196   EXPECT_FALSE(fm.hasTasks());
1197 }
1198
1199 TEST(FiberManager, fiberLocal) {
1200   struct SimpleData {
1201     int value{42};
1202   };
1203
1204   testFiberLocal<SimpleData>();
1205 }
1206
1207 TEST(FiberManager, fiberLocalHeap) {
1208   struct LargeData {
1209     char _[1024 * 1024];
1210     int value{42};
1211   };
1212
1213   testFiberLocal<LargeData>();
1214 }
1215
1216 TEST(FiberManager, fiberLocalDestructor) {
1217   struct CrazyData {
1218     size_t data{42};
1219
1220     ~CrazyData() {
1221       if (data == 41) {
1222         addTask([]() {
1223           EXPECT_EQ(42, local<CrazyData>().data);
1224           // Make sure we don't have infinite loop
1225           local<CrazyData>().data = 0;
1226         });
1227       }
1228     }
1229   };
1230
1231   FiberManager fm(
1232       LocalType<CrazyData>(), folly::make_unique<SimpleLoopController>());
1233
1234   fm.addTask([]() { local<CrazyData>().data = 41; });
1235
1236   fm.loopUntilNoReady();
1237   EXPECT_FALSE(fm.hasTasks());
1238 }
1239
1240 TEST(FiberManager, yieldTest) {
1241   FiberManager manager(folly::make_unique<SimpleLoopController>());
1242   auto& loopController =
1243       dynamic_cast<SimpleLoopController&>(manager.loopController());
1244
1245   bool checkRan = false;
1246
1247   manager.addTask([&]() {
1248     manager.yield();
1249     checkRan = true;
1250   });
1251
1252   loopController.loop([&]() {
1253     if (checkRan) {
1254       loopController.stop();
1255     }
1256   });
1257
1258   EXPECT_TRUE(checkRan);
1259 }
1260
1261 TEST(FiberManager, RequestContext) {
1262   FiberManager fm(folly::make_unique<SimpleLoopController>());
1263
1264   bool checkRun1 = false;
1265   bool checkRun2 = false;
1266   bool checkRun3 = false;
1267   bool checkRun4 = false;
1268   folly::fibers::Baton baton1;
1269   folly::fibers::Baton baton2;
1270   folly::fibers::Baton baton3;
1271   folly::fibers::Baton baton4;
1272
1273   {
1274     folly::RequestContextScopeGuard rctx;
1275     auto rcontext1 = folly::RequestContext::get();
1276     fm.addTask([&, rcontext1]() {
1277       EXPECT_EQ(rcontext1, folly::RequestContext::get());
1278       baton1.wait(
1279           [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1280       EXPECT_EQ(rcontext1, folly::RequestContext::get());
1281       runInMainContext(
1282           [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1283       checkRun1 = true;
1284     });
1285   }
1286   {
1287     folly::RequestContextScopeGuard rctx;
1288     auto rcontext2 = folly::RequestContext::get();
1289     fm.addTaskRemote([&, rcontext2]() {
1290       EXPECT_EQ(rcontext2, folly::RequestContext::get());
1291       baton2.wait();
1292       EXPECT_EQ(rcontext2, folly::RequestContext::get());
1293       checkRun2 = true;
1294     });
1295   }
1296   {
1297     folly::RequestContextScopeGuard rctx;
1298     auto rcontext3 = folly::RequestContext::get();
1299     fm.addTaskFinally(
1300         [&, rcontext3]() {
1301           EXPECT_EQ(rcontext3, folly::RequestContext::get());
1302           baton3.wait();
1303           EXPECT_EQ(rcontext3, folly::RequestContext::get());
1304
1305           return folly::Unit();
1306         },
1307         [&, rcontext3](Try<folly::Unit>&& /* t */) {
1308           EXPECT_EQ(rcontext3, folly::RequestContext::get());
1309           checkRun3 = true;
1310         });
1311   }
1312   {
1313     folly::RequestContext::setContext(nullptr);
1314     fm.addTask([&]() {
1315       folly::RequestContextScopeGuard rctx;
1316       auto rcontext4 = folly::RequestContext::get();
1317       baton4.wait();
1318       EXPECT_EQ(rcontext4, folly::RequestContext::get());
1319       checkRun4 = true;
1320     });
1321   }
1322   {
1323     folly::RequestContextScopeGuard rctx;
1324     auto rcontext = folly::RequestContext::get();
1325
1326     fm.loopUntilNoReady();
1327     EXPECT_EQ(rcontext, folly::RequestContext::get());
1328
1329     baton1.post();
1330     EXPECT_EQ(rcontext, folly::RequestContext::get());
1331     fm.loopUntilNoReady();
1332     EXPECT_TRUE(checkRun1);
1333     EXPECT_EQ(rcontext, folly::RequestContext::get());
1334
1335     baton2.post();
1336     EXPECT_EQ(rcontext, folly::RequestContext::get());
1337     fm.loopUntilNoReady();
1338     EXPECT_TRUE(checkRun2);
1339     EXPECT_EQ(rcontext, folly::RequestContext::get());
1340
1341     baton3.post();
1342     EXPECT_EQ(rcontext, folly::RequestContext::get());
1343     fm.loopUntilNoReady();
1344     EXPECT_TRUE(checkRun3);
1345     EXPECT_EQ(rcontext, folly::RequestContext::get());
1346
1347     baton4.post();
1348     EXPECT_EQ(rcontext, folly::RequestContext::get());
1349     fm.loopUntilNoReady();
1350     EXPECT_TRUE(checkRun4);
1351     EXPECT_EQ(rcontext, folly::RequestContext::get());
1352   }
1353 }
1354
1355 TEST(FiberManager, resizePeriodically) {
1356   FiberManager::Options opts;
1357   opts.fibersPoolResizePeriodMs = 300;
1358   opts.maxFibersPoolSize = 5;
1359
1360   FiberManager manager(folly::make_unique<EventBaseLoopController>(), opts);
1361
1362   folly::EventBase evb;
1363   dynamic_cast<EventBaseLoopController&>(manager.loopController())
1364       .attachEventBase(evb);
1365
1366   std::vector<Baton> batons(10);
1367
1368   size_t tasksRun = 0;
1369   for (size_t i = 0; i < 30; ++i) {
1370     manager.addTask([i, &batons, &tasksRun]() {
1371       ++tasksRun;
1372       // Keep some fibers active indefinitely
1373       if (i < batons.size()) {
1374         batons[i].wait();
1375       }
1376     });
1377   }
1378
1379   EXPECT_EQ(0, tasksRun);
1380   EXPECT_EQ(30, manager.fibersAllocated());
1381   EXPECT_EQ(0, manager.fibersPoolSize());
1382
1383   evb.loopOnce();
1384   EXPECT_EQ(30, tasksRun);
1385   EXPECT_EQ(30, manager.fibersAllocated());
1386   // Can go over maxFibersPoolSize, 10 of 30 fibers still active
1387   EXPECT_EQ(20, manager.fibersPoolSize());
1388
1389   std::this_thread::sleep_for(std::chrono::milliseconds(400));
1390   evb.loopOnce(); // no fibers active in this period
1391   EXPECT_EQ(30, manager.fibersAllocated());
1392   EXPECT_EQ(20, manager.fibersPoolSize());
1393
1394   std::this_thread::sleep_for(std::chrono::milliseconds(400));
1395   evb.loopOnce(); // should shrink fibers pool to maxFibersPoolSize
1396   EXPECT_EQ(15, manager.fibersAllocated());
1397   EXPECT_EQ(5, manager.fibersPoolSize());
1398
1399   for (size_t i = 0; i < batons.size(); ++i) {
1400     batons[i].post();
1401   }
1402   evb.loopOnce();
1403   EXPECT_EQ(15, manager.fibersAllocated());
1404   EXPECT_EQ(15, manager.fibersPoolSize());
1405
1406   std::this_thread::sleep_for(std::chrono::milliseconds(400));
1407   evb.loopOnce(); // 10 fibers active in last period
1408   EXPECT_EQ(10, manager.fibersAllocated());
1409   EXPECT_EQ(10, manager.fibersPoolSize());
1410
1411   std::this_thread::sleep_for(std::chrono::milliseconds(400));
1412   evb.loopOnce();
1413   EXPECT_EQ(5, manager.fibersAllocated());
1414   EXPECT_EQ(5, manager.fibersPoolSize());
1415 }
1416
1417 TEST(FiberManager, batonWaitTimeoutHandler) {
1418   FiberManager manager(folly::make_unique<EventBaseLoopController>());
1419
1420   folly::EventBase evb;
1421   dynamic_cast<EventBaseLoopController&>(manager.loopController())
1422       .attachEventBase(evb);
1423
1424   size_t fibersRun = 0;
1425   Baton baton;
1426   Baton::TimeoutHandler timeoutHandler;
1427
1428   manager.addTask([&]() {
1429     baton.wait(timeoutHandler);
1430     ++fibersRun;
1431   });
1432   manager.loopUntilNoReady();
1433
1434   EXPECT_FALSE(baton.try_wait());
1435   EXPECT_EQ(0, fibersRun);
1436
1437   timeoutHandler.scheduleTimeout(std::chrono::milliseconds(250));
1438   std::this_thread::sleep_for(std::chrono::milliseconds(500));
1439
1440   EXPECT_FALSE(baton.try_wait());
1441   EXPECT_EQ(0, fibersRun);
1442
1443   evb.loopOnce();
1444   manager.loopUntilNoReady();
1445
1446   EXPECT_EQ(1, fibersRun);
1447 }
1448
1449 TEST(FiberManager, batonWaitTimeoutMany) {
1450   FiberManager manager(folly::make_unique<EventBaseLoopController>());
1451
1452   folly::EventBase evb;
1453   dynamic_cast<EventBaseLoopController&>(manager.loopController())
1454       .attachEventBase(evb);
1455
1456   constexpr size_t kNumTimeoutTasks = 10000;
1457   size_t tasksCount = kNumTimeoutTasks;
1458
1459   // We add many tasks to hit timeout queue deallocation logic.
1460   for (size_t i = 0; i < kNumTimeoutTasks; ++i) {
1461     manager.addTask([&]() {
1462       Baton baton;
1463       Baton::TimeoutHandler timeoutHandler;
1464
1465       folly::fibers::addTask([&] {
1466         timeoutHandler.scheduleTimeout(std::chrono::milliseconds(1000));
1467       });
1468
1469       baton.wait(timeoutHandler);
1470       if (--tasksCount == 0) {
1471         evb.terminateLoopSoon();
1472       }
1473     });
1474   }
1475
1476   evb.loopForever();
1477 }
1478
1479 TEST(FiberManager, remoteFutureTest) {
1480   FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
1481   auto& loopController =
1482       dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1483
1484   int testValue1 = 5;
1485   int testValue2 = 7;
1486   auto f1 = fiberManager.addTaskFuture([&]() { return testValue1; });
1487   auto f2 = fiberManager.addTaskRemoteFuture([&]() { return testValue2; });
1488   loopController.loop([&]() { loopController.stop(); });
1489   auto v1 = f1.get();
1490   auto v2 = f2.get();
1491
1492   EXPECT_EQ(v1, testValue1);
1493   EXPECT_EQ(v2, testValue2);
1494 }
1495
1496 // Test that a void function produes a Future<Unit>.
1497 TEST(FiberManager, remoteFutureVoidUnitTest) {
1498   FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
1499   auto& loopController =
1500       dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1501
1502   bool ranLocal = false;
1503   folly::Future<folly::Unit> futureLocal =
1504       fiberManager.addTaskFuture([&]() { ranLocal = true; });
1505
1506   bool ranRemote = false;
1507   folly::Future<folly::Unit> futureRemote =
1508       fiberManager.addTaskRemoteFuture([&]() { ranRemote = true; });
1509
1510   loopController.loop([&]() { loopController.stop(); });
1511
1512   futureLocal.wait();
1513   ASSERT_TRUE(ranLocal);
1514
1515   futureRemote.wait();
1516   ASSERT_TRUE(ranRemote);
1517 }
1518
1519 TEST(FiberManager, nestedFiberManagers) {
1520   folly::EventBase outerEvb;
1521   folly::EventBase innerEvb;
1522
1523   getFiberManager(outerEvb).addTask([&]() {
1524     EXPECT_EQ(
1525         &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1526
1527     runInMainContext([&]() {
1528       getFiberManager(innerEvb).addTask([&]() {
1529         EXPECT_EQ(
1530             &getFiberManager(innerEvb), FiberManager::getFiberManagerUnsafe());
1531
1532         innerEvb.terminateLoopSoon();
1533       });
1534
1535       innerEvb.loopForever();
1536     });
1537
1538     EXPECT_EQ(
1539         &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1540
1541     outerEvb.terminateLoopSoon();
1542   });
1543
1544   outerEvb.loopForever();
1545 }
1546
1547 TEST(FiberManager, semaphore) {
1548   constexpr size_t kTasks = 10;
1549   constexpr size_t kIterations = 10000;
1550   constexpr size_t kNumTokens = 10;
1551
1552   Semaphore sem(kNumTokens);
1553   int counterA = 0;
1554   int counterB = 0;
1555
1556   auto task = [&sem, kTasks, kIterations, kNumTokens](
1557       int& counter, folly::fibers::Baton& baton) {
1558     FiberManager manager(folly::make_unique<EventBaseLoopController>());
1559     folly::EventBase evb;
1560     dynamic_cast<EventBaseLoopController&>(manager.loopController())
1561         .attachEventBase(evb);
1562
1563     {
1564       std::shared_ptr<folly::EventBase> completionCounter(
1565           &evb, [](folly::EventBase* evb) { evb->terminateLoopSoon(); });
1566
1567       for (size_t i = 0; i < kTasks; ++i) {
1568         manager.addTask([&, completionCounter]() {
1569           for (size_t j = 0; j < kIterations; ++j) {
1570             sem.wait();
1571             ++counter;
1572             sem.signal();
1573             --counter;
1574
1575             EXPECT_LT(counter, kNumTokens);
1576             EXPECT_GE(counter, 0);
1577           }
1578         });
1579       }
1580
1581       baton.wait();
1582     }
1583     evb.loopForever();
1584   };
1585
1586   folly::fibers::Baton batonA;
1587   folly::fibers::Baton batonB;
1588   std::thread threadA([&] { task(counterA, batonA); });
1589   std::thread threadB([&] { task(counterB, batonB); });
1590
1591   batonA.post();
1592   batonB.post();
1593   threadA.join();
1594   threadB.join();
1595
1596   EXPECT_LT(counterA, kNumTokens);
1597   EXPECT_LT(counterB, kNumTokens);
1598   EXPECT_GE(counterA, 0);
1599   EXPECT_GE(counterB, 0);
1600 }
1601
1602 template <typename ExecutorT>
1603 void singleBatchDispatch(ExecutorT& executor, int batchSize, int index) {
1604   thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
1605       executor, [=](std::vector<int>&& batch) {
1606         EXPECT_EQ(batchSize, batch.size());
1607         std::vector<std::string> results;
1608         for (auto& it : batch) {
1609           results.push_back(folly::to<std::string>(it));
1610         }
1611         return results;
1612       });
1613
1614   auto indexCopy = index;
1615   auto result = batchDispatcher.add(std::move(indexCopy));
1616   EXPECT_EQ(folly::to<std::string>(index), result.get());
1617 }
1618
1619 TEST(FiberManager, batchDispatchTest) {
1620   folly::EventBase evb;
1621   auto& executor = getFiberManager(evb);
1622
1623   // Launch multiple fibers with a single id.
1624   executor.add([&]() {
1625     int batchSize = 10;
1626     for (int i = 0; i < batchSize; i++) {
1627       executor.add(
1628           [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
1629     }
1630   });
1631   evb.loop();
1632
1633   // Reuse the same BatchDispatcher to batch once again.
1634   executor.add([&]() {
1635     int batchSize = 10;
1636     for (int i = 0; i < batchSize; i++) {
1637       executor.add(
1638           [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
1639     }
1640   });
1641   evb.loop();
1642 }
1643
1644 template <typename ExecutorT>
1645 folly::Future<std::vector<std::string>> doubleBatchInnerDispatch(
1646     ExecutorT& executor,
1647     int totalNumberOfElements,
1648     std::vector<int> input) {
1649   thread_local BatchDispatcher<
1650       std::vector<int>,
1651       std::vector<std::string>,
1652       ExecutorT>
1653   batchDispatcher(executor, [=](std::vector<std::vector<int>>&& batch) {
1654     std::vector<std::vector<std::string>> results;
1655     int numberOfElements = 0;
1656     for (auto& unit : batch) {
1657       numberOfElements += unit.size();
1658       std::vector<std::string> result;
1659       for (auto& element : unit) {
1660         result.push_back(folly::to<std::string>(element));
1661       }
1662       results.push_back(std::move(result));
1663     }
1664     EXPECT_EQ(totalNumberOfElements, numberOfElements);
1665     return results;
1666   });
1667
1668   return batchDispatcher.add(std::move(input));
1669 }
1670
1671 /**
1672  * Batch values in groups of 5, and then call inner dispatch.
1673  */
1674 template <typename ExecutorT>
1675 void doubleBatchOuterDispatch(
1676     ExecutorT& executor,
1677     int totalNumberOfElements,
1678     int index) {
1679   thread_local BatchDispatcher<int, std::string, ExecutorT>
1680   batchDispatcher(executor, [=, &executor](std::vector<int>&& batch) {
1681     EXPECT_EQ(totalNumberOfElements, batch.size());
1682     std::vector<std::string> results;
1683     std::vector<folly::Future<std::vector<std::string>>>
1684         innerDispatchResultFutures;
1685
1686     std::vector<int> group;
1687     for (auto unit : batch) {
1688       group.push_back(unit);
1689       if (group.size() == 5) {
1690         auto localGroup = group;
1691         group.clear();
1692
1693         innerDispatchResultFutures.push_back(doubleBatchInnerDispatch(
1694             executor, totalNumberOfElements, localGroup));
1695       }
1696     }
1697
1698     folly::collectAll(
1699         innerDispatchResultFutures.begin(), innerDispatchResultFutures.end())
1700         .then([&](
1701             std::vector<Try<std::vector<std::string>>> innerDispatchResults) {
1702           for (auto& unit : innerDispatchResults) {
1703             for (auto& element : unit.value()) {
1704               results.push_back(element);
1705             }
1706           }
1707         })
1708         .get();
1709     return results;
1710   });
1711
1712   auto indexCopy = index;
1713   auto result = batchDispatcher.add(std::move(indexCopy));
1714   EXPECT_EQ(folly::to<std::string>(index), result.get());
1715 }
1716
1717 TEST(FiberManager, doubleBatchDispatchTest) {
1718   folly::EventBase evb;
1719   auto& executor = getFiberManager(evb);
1720
1721   // Launch multiple fibers with a single id.
1722   executor.add([&]() {
1723     int totalNumberOfElements = 20;
1724     for (int i = 0; i < totalNumberOfElements; i++) {
1725       executor.add([=, &executor]() {
1726         doubleBatchOuterDispatch(executor, totalNumberOfElements, i);
1727       });
1728     }
1729   });
1730   evb.loop();
1731 }
1732
1733 template <typename ExecutorT>
1734 void batchDispatchExceptionHandling(ExecutorT& executor, int i) {
1735   thread_local BatchDispatcher<int, int, ExecutorT> batchDispatcher(
1736       executor, [=, &executor](std::vector<int> &&) -> std::vector<int> {
1737         throw std::runtime_error("Surprise!!");
1738       });
1739
1740   EXPECT_THROW(batchDispatcher.add(i).get(), std::runtime_error);
1741 }
1742
1743 TEST(FiberManager, batchDispatchExceptionHandlingTest) {
1744   folly::EventBase evb;
1745   auto& executor = getFiberManager(evb);
1746
1747   // Launch multiple fibers with a single id.
1748   executor.add([&]() {
1749     int totalNumberOfElements = 5;
1750     for (int i = 0; i < totalNumberOfElements; i++) {
1751       executor.add(
1752           [=, &executor]() { batchDispatchExceptionHandling(executor, i); });
1753     }
1754   });
1755   evb.loop();
1756 }
1757
1758 namespace AtomicBatchDispatcherTesting {
1759
1760 using ValueT = size_t;
1761 using ResultT = std::string;
1762 using DispatchFunctionT =
1763     folly::Function<std::vector<ResultT>(std::vector<ValueT>&&)>;
1764
1765 #define ENABLE_TRACE_IN_TEST 0 // Set to 1 to debug issues in ABD tests
1766 #if ENABLE_TRACE_IN_TEST
1767 #define OUTPUT_TRACE std::cerr
1768 #else // ENABLE_TRACE_IN_TEST
1769 struct DevNullPiper {
1770   template <typename T>
1771   DevNullPiper& operator<<(const T&) {
1772     return *this;
1773   }
1774
1775   DevNullPiper& operator<<(std::ostream& (*)(std::ostream&)) {
1776     return *this;
1777   }
1778 } devNullPiper;
1779 #define OUTPUT_TRACE devNullPiper
1780 #endif // ENABLE_TRACE_IN_TEST
1781
1782 struct Job {
1783   AtomicBatchDispatcher<ValueT, ResultT>::Token token;
1784   ValueT input;
1785
1786   void preprocess(FiberManager& executor, bool die) {
1787     // Yield for a random duration [0, 10] ms to simulate I/O in preprocessing
1788     clock_t msecToDoIO = folly::Random::rand32() % 10;
1789     double start = (1000.0 * clock()) / CLOCKS_PER_SEC;
1790     double endAfter = start + msecToDoIO;
1791     while ((1000.0 * clock()) / CLOCKS_PER_SEC < endAfter) {
1792       executor.yield();
1793     }
1794     if (die) {
1795       throw std::logic_error("Simulating preprocessing failure");
1796     }
1797   }
1798
1799   Job(AtomicBatchDispatcher<ValueT, ResultT>::Token&& t, ValueT i)
1800       : token(std::move(t)), input(i) {}
1801
1802   Job(Job&&) = default;
1803   Job& operator=(Job&&) = default;
1804 };
1805
1806 ResultT processSingleInput(ValueT&& input) {
1807   return folly::to<ResultT>(std::move(input));
1808 }
1809
1810 std::vector<ResultT> userDispatchFunc(std::vector<ValueT>&& inputs) {
1811   size_t expectedCount = inputs.size();
1812   std::vector<ResultT> results;
1813   results.reserve(expectedCount);
1814   for (size_t i = 0; i < expectedCount; ++i) {
1815     results.emplace_back(processSingleInput(std::move(inputs[i])));
1816   }
1817   return results;
1818 }
1819
1820 void createJobs(
1821     AtomicBatchDispatcher<ValueT, ResultT>& atomicBatchDispatcher,
1822     std::vector<Job>& jobs,
1823     size_t count) {
1824   jobs.clear();
1825   for (size_t i = 0; i < count; ++i) {
1826     jobs.emplace_back(Job(atomicBatchDispatcher.getToken(), i));
1827   }
1828 }
1829
1830 enum class DispatchProblem {
1831   None,
1832   PreprocessThrows,
1833   DuplicateDispatch,
1834 };
1835
1836 void dispatchJobs(
1837     FiberManager& executor,
1838     std::vector<Job>& jobs,
1839     std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1840     DispatchProblem dispatchProblem = DispatchProblem::None,
1841     size_t problemIndex = size_t(-1)) {
1842   EXPECT_TRUE(
1843       dispatchProblem == DispatchProblem::None || problemIndex < jobs.size());
1844   results.clear();
1845   results.resize(jobs.size());
1846   for (size_t i = 0; i < jobs.size(); ++i) {
1847     executor.add(
1848         [i, &executor, &jobs, &results, dispatchProblem, problemIndex]() {
1849           try {
1850             Job job(std::move(jobs[i]));
1851
1852             if (dispatchProblem == DispatchProblem::PreprocessThrows) {
1853               if (i == problemIndex) {
1854                 EXPECT_THROW(job.preprocess(executor, true), std::logic_error);
1855                 return;
1856               }
1857             }
1858
1859             job.preprocess(executor, false);
1860             OUTPUT_TRACE << "Dispatching job #" << i << std::endl;
1861             results[i] = job.token.dispatch(job.input);
1862             OUTPUT_TRACE << "Result future filled for job #" << i << std::endl;
1863
1864             if (dispatchProblem == DispatchProblem::DuplicateDispatch) {
1865               if (i == problemIndex) {
1866                 EXPECT_THROW(job.token.dispatch(job.input), std::logic_error);
1867               }
1868             }
1869           } catch (...) {
1870             OUTPUT_TRACE << "Preprocessing failed for job #" << i << std::endl;
1871           }
1872         });
1873   }
1874 }
1875
1876 void validateResult(
1877     std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1878     size_t i) {
1879   try {
1880     OUTPUT_TRACE << "results[" << i << "].value() : " << results[i]->value()
1881                  << std::endl;
1882   } catch (std::exception& e) {
1883     OUTPUT_TRACE << "Exception : " << e.what() << std::endl;
1884     throw;
1885   }
1886 }
1887
1888 template <typename TException>
1889 void validateResults(
1890     std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1891     size_t expectedNumResults) {
1892   size_t numResultsFilled = 0;
1893   for (size_t i = 0; i < results.size(); ++i) {
1894     if (!results[i]) {
1895       continue;
1896     }
1897     ++numResultsFilled;
1898     EXPECT_THROW(validateResult(results, i), TException);
1899   }
1900   EXPECT_EQ(numResultsFilled, expectedNumResults);
1901 }
1902
1903 void validateResults(
1904     std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1905     size_t expectedNumResults) {
1906   size_t numResultsFilled = 0;
1907   for (size_t i = 0; i < results.size(); ++i) {
1908     if (!results[i]) {
1909       continue;
1910     }
1911     ++numResultsFilled;
1912     EXPECT_NO_THROW(validateResult(results, i));
1913     ValueT expectedInput = i;
1914     EXPECT_EQ(
1915         results[i]->value(), processSingleInput(std::move(expectedInput)));
1916   }
1917   EXPECT_EQ(numResultsFilled, expectedNumResults);
1918 }
1919
1920 } // AtomicBatchDispatcherTesting
1921
1922 #define SET_UP_TEST_FUNC                                        \
1923   using namespace AtomicBatchDispatcherTesting;                 \
1924   folly::EventBase evb;                                         \
1925   auto& executor = getFiberManager(evb);                        \
1926   const size_t COUNT = 11;                                      \
1927   std::vector<Job> jobs;                                        \
1928   jobs.reserve(COUNT);                                          \
1929   std::vector<folly::Optional<folly::Future<ResultT>>> results; \
1930   results.reserve(COUNT);                                       \
1931   DispatchFunctionT dispatchFunc
1932
1933 TEST(FiberManager, ABD_Test) {
1934   SET_UP_TEST_FUNC;
1935
1936   //
1937   // Testing AtomicBatchDispatcher with explicit call to commit()
1938   //
1939   dispatchFunc = userDispatchFunc;
1940   auto atomicBatchDispatcher =
1941       createAtomicBatchDispatcher(std::move(dispatchFunc));
1942   createJobs(atomicBatchDispatcher, jobs, COUNT);
1943   dispatchJobs(executor, jobs, results);
1944   atomicBatchDispatcher.commit();
1945   evb.loop();
1946   validateResults(results, COUNT);
1947 }
1948
1949 TEST(FiberManager, ABD_DispatcherDestroyedBeforeCallingCommit) {
1950   SET_UP_TEST_FUNC;
1951
1952   //
1953   // Testing AtomicBatchDispatcher destroyed before calling commit.
1954   // Handles error cases for:
1955   // - User might have forgotten to add the call to commit() in the code
1956   // - An unexpected exception got thrown in user code before commit() is called
1957   //
1958   try {
1959     dispatchFunc = userDispatchFunc;
1960     auto atomicBatchDispatcher =
1961         createAtomicBatchDispatcher(std::move(dispatchFunc));
1962     createJobs(atomicBatchDispatcher, jobs, COUNT);
1963     dispatchJobs(executor, jobs, results);
1964     throw std::runtime_error(
1965         "Unexpected exception in user code before commit called");
1966     atomicBatchDispatcher.commit();
1967   } catch (...) {
1968     /* User code handles the exception and does not exit process */
1969   }
1970   evb.loop();
1971   validateResults<std::logic_error>(results, COUNT);
1972 }
1973
1974 TEST(FiberManager, ABD_PreprocessingFailureTest) {
1975   SET_UP_TEST_FUNC;
1976
1977   //
1978   // Testing preprocessing failure on a job throws
1979   //
1980   dispatchFunc = userDispatchFunc;
1981   auto atomicBatchDispatcher =
1982       createAtomicBatchDispatcher(std::move(dispatchFunc));
1983   createJobs(atomicBatchDispatcher, jobs, COUNT);
1984   dispatchJobs(executor, jobs, results, DispatchProblem::PreprocessThrows, 8);
1985   atomicBatchDispatcher.commit();
1986   evb.loop();
1987   validateResults<std::logic_error>(results, COUNT - 1);
1988 }
1989
1990 TEST(FiberManager, ABD_MultipleDispatchOnSameTokenErrorTest) {
1991   SET_UP_TEST_FUNC;
1992
1993   //
1994   // Testing that calling dispatch more than once on the same token throws
1995   //
1996   dispatchFunc = userDispatchFunc;
1997   auto atomicBatchDispatcher =
1998       createAtomicBatchDispatcher(std::move(dispatchFunc));
1999   createJobs(atomicBatchDispatcher, jobs, COUNT);
2000   dispatchJobs(executor, jobs, results, DispatchProblem::DuplicateDispatch, 4);
2001   atomicBatchDispatcher.commit();
2002   evb.loop();
2003 }
2004
2005 TEST(FiberManager, ABD_GetTokenCalledAfterCommitTest) {
2006   SET_UP_TEST_FUNC;
2007
2008   //
2009   // Testing that exception set on attempt to call getToken after commit called
2010   //
2011   dispatchFunc = userDispatchFunc;
2012   auto atomicBatchDispatcher =
2013       createAtomicBatchDispatcher(std::move(dispatchFunc));
2014   createJobs(atomicBatchDispatcher, jobs, COUNT);
2015   atomicBatchDispatcher.commit();
2016   EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
2017   dispatchJobs(executor, jobs, results);
2018   EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
2019   evb.loop();
2020   validateResults(results, COUNT);
2021   EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
2022 }
2023
2024 TEST(FiberManager, ABD_UserProvidedBatchDispatchThrowsTest) {
2025   SET_UP_TEST_FUNC;
2026
2027   //
2028   // Testing that exception is set if user provided batch dispatch throws
2029   //
2030   dispatchFunc = [](std::vector<ValueT>&& inputs) -> std::vector<ResultT> {
2031     auto results = userDispatchFunc(std::move(inputs));
2032     throw std::runtime_error("Unexpected exception in user dispatch function");
2033     return results;
2034   };
2035   auto atomicBatchDispatcher =
2036       createAtomicBatchDispatcher(std::move(dispatchFunc));
2037   createJobs(atomicBatchDispatcher, jobs, COUNT);
2038   dispatchJobs(executor, jobs, results);
2039   atomicBatchDispatcher.commit();
2040   evb.loop();
2041   validateResults<std::runtime_error>(results, COUNT);
2042 }
2043
2044 /**
2045  * Test that we can properly track fiber stack usage.
2046  *
2047  * This functionality can only be enabled when ASAN is disabled, so avoid
2048  * running this test with ASAN.
2049  */
2050 #ifndef FOLLY_SANITIZE_ADDRESS
2051 TEST(FiberManager, recordStack) {
2052   std::thread([] {
2053     folly::fibers::FiberManager::Options opts;
2054     opts.recordStackEvery = 1;
2055
2056     FiberManager fm(folly::make_unique<SimpleLoopController>(), opts);
2057     auto& loopController =
2058         dynamic_cast<SimpleLoopController&>(fm.loopController());
2059
2060     static constexpr size_t n = 1000;
2061     int s = 0;
2062     fm.addTask([&]() {
2063       int b[n] = {0};
2064       for (size_t i = 0; i < n; ++i) {
2065         b[i] = i;
2066       }
2067       for (size_t i = 0; i + 1 < n; ++i) {
2068         s += b[i] * b[i + 1];
2069       }
2070     });
2071
2072     (void)s;
2073
2074     loopController.loop([&]() { loopController.stop(); });
2075
2076     // Check that we properly accounted fiber stack usage.
2077     EXPECT_LT(n * sizeof(int), fm.stackHighWatermark());
2078   }).join();
2079 }
2080 #endif