VirtualEventBase
[folly.git] / folly / fibers / test / FibersTest.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <atomic>
17 #include <thread>
18 #include <vector>
19
20 #include <folly/Memory.h>
21 #include <folly/Random.h>
22 #include <folly/futures/Future.h>
23
24 #include <folly/Conv.h>
25 #include <folly/fibers/AddTasks.h>
26 #include <folly/fibers/AtomicBatchDispatcher.h>
27 #include <folly/fibers/BatchDispatcher.h>
28 #include <folly/fibers/EventBaseLoopController.h>
29 #include <folly/fibers/FiberManager.h>
30 #include <folly/fibers/FiberManagerMap.h>
31 #include <folly/fibers/GenericBaton.h>
32 #include <folly/fibers/Semaphore.h>
33 #include <folly/fibers/SimpleLoopController.h>
34 #include <folly/fibers/WhenN.h>
35 #include <folly/io/async/ScopedEventBaseThread.h>
36 #include <folly/portability/GTest.h>
37
38 using namespace folly::fibers;
39
40 using folly::Try;
41
42 TEST(FiberManager, batonTimedWaitTimeout) {
43   bool taskAdded = false;
44   size_t iterations = 0;
45
46   FiberManager manager(folly::make_unique<SimpleLoopController>());
47   auto& loopController =
48       dynamic_cast<SimpleLoopController&>(manager.loopController());
49
50   auto loopFunc = [&]() {
51     if (!taskAdded) {
52       manager.addTask([&]() {
53         Baton baton;
54
55         auto res = baton.timed_wait(std::chrono::milliseconds(230));
56
57         EXPECT_FALSE(res);
58         EXPECT_EQ(5, iterations);
59
60         loopController.stop();
61       });
62       manager.addTask([&]() {
63         Baton baton;
64
65         auto res = baton.timed_wait(std::chrono::milliseconds(130));
66
67         EXPECT_FALSE(res);
68         EXPECT_EQ(3, iterations);
69
70         loopController.stop();
71       });
72       taskAdded = true;
73     } else {
74       std::this_thread::sleep_for(std::chrono::milliseconds(50));
75       iterations++;
76     }
77   };
78
79   loopController.loop(std::move(loopFunc));
80 }
81
82 TEST(FiberManager, batonTimedWaitPost) {
83   bool taskAdded = false;
84   size_t iterations = 0;
85   Baton* baton_ptr;
86
87   FiberManager manager(folly::make_unique<SimpleLoopController>());
88   auto& loopController =
89       dynamic_cast<SimpleLoopController&>(manager.loopController());
90
91   auto loopFunc = [&]() {
92     if (!taskAdded) {
93       manager.addTask([&]() {
94         Baton baton;
95         baton_ptr = &baton;
96
97         auto res = baton.timed_wait(std::chrono::milliseconds(130));
98
99         EXPECT_TRUE(res);
100         EXPECT_EQ(2, iterations);
101
102         loopController.stop();
103       });
104       taskAdded = true;
105     } else {
106       std::this_thread::sleep_for(std::chrono::milliseconds(50));
107       iterations++;
108       if (iterations == 2) {
109         baton_ptr->post();
110       }
111     }
112   };
113
114   loopController.loop(std::move(loopFunc));
115 }
116
117 TEST(FiberManager, batonTimedWaitTimeoutEvb) {
118   size_t tasksComplete = 0;
119
120   folly::EventBase evb;
121
122   FiberManager manager(folly::make_unique<EventBaseLoopController>());
123   dynamic_cast<EventBaseLoopController&>(manager.loopController())
124       .attachEventBase(evb);
125
126   auto task = [&](size_t timeout_ms) {
127     Baton baton;
128
129     auto start = EventBaseLoopController::Clock::now();
130     auto res = baton.timed_wait(std::chrono::milliseconds(timeout_ms));
131     auto finish = EventBaseLoopController::Clock::now();
132
133     EXPECT_FALSE(res);
134
135     auto duration_ms =
136         std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
137
138     EXPECT_GT(duration_ms.count(), timeout_ms - 50);
139     EXPECT_LT(duration_ms.count(), timeout_ms + 50);
140
141     if (++tasksComplete == 2) {
142       evb.terminateLoopSoon();
143     }
144   };
145
146   evb.runInEventBaseThread([&]() {
147     manager.addTask([&]() { task(500); });
148     manager.addTask([&]() { task(250); });
149   });
150
151   evb.loopForever();
152
153   EXPECT_EQ(2, tasksComplete);
154 }
155
156 TEST(FiberManager, batonTimedWaitPostEvb) {
157   size_t tasksComplete = 0;
158
159   folly::EventBase evb;
160
161   FiberManager manager(folly::make_unique<EventBaseLoopController>());
162   dynamic_cast<EventBaseLoopController&>(manager.loopController())
163       .attachEventBase(evb);
164
165   evb.runInEventBaseThread([&]() {
166     manager.addTask([&]() {
167       Baton baton;
168
169       evb.tryRunAfterDelay([&]() { baton.post(); }, 100);
170
171       auto start = EventBaseLoopController::Clock::now();
172       auto res = baton.timed_wait(std::chrono::milliseconds(130));
173       auto finish = EventBaseLoopController::Clock::now();
174
175       EXPECT_TRUE(res);
176
177       auto duration_ms =
178           std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
179
180       EXPECT_TRUE(duration_ms.count() > 95 && duration_ms.count() < 110);
181
182       if (++tasksComplete == 1) {
183         evb.terminateLoopSoon();
184       }
185     });
186   });
187
188   evb.loopForever();
189
190   EXPECT_EQ(1, tasksComplete);
191 }
192
193 TEST(FiberManager, batonTryWait) {
194   FiberManager manager(folly::make_unique<SimpleLoopController>());
195
196   // Check if try_wait and post work as expected
197   Baton b;
198
199   manager.addTask([&]() {
200     while (!b.try_wait()) {
201     }
202   });
203   auto thr = std::thread([&]() {
204     std::this_thread::sleep_for(std::chrono::milliseconds(300));
205     b.post();
206   });
207
208   manager.loopUntilNoReady();
209   thr.join();
210
211   Baton c;
212
213   // Check try_wait without post
214   manager.addTask([&]() {
215     int cnt = 100;
216     while (cnt && !c.try_wait()) {
217       cnt--;
218     }
219     EXPECT_TRUE(!c.try_wait()); // must still hold
220     EXPECT_EQ(cnt, 0);
221   });
222
223   manager.loopUntilNoReady();
224 }
225
226 TEST(FiberManager, genericBatonFiberWait) {
227   FiberManager manager(folly::make_unique<SimpleLoopController>());
228
229   GenericBaton b;
230   bool fiberRunning = false;
231
232   manager.addTask([&]() {
233     EXPECT_EQ(manager.hasActiveFiber(), true);
234     fiberRunning = true;
235     b.wait();
236     fiberRunning = false;
237   });
238
239   EXPECT_FALSE(fiberRunning);
240   manager.loopUntilNoReady();
241   EXPECT_TRUE(fiberRunning); // ensure fiber still active
242
243   auto thr = std::thread([&]() {
244     std::this_thread::sleep_for(std::chrono::milliseconds(300));
245     b.post();
246   });
247
248   while (fiberRunning) {
249     manager.loopUntilNoReady();
250   }
251
252   thr.join();
253 }
254
255 TEST(FiberManager, genericBatonThreadWait) {
256   FiberManager manager(folly::make_unique<SimpleLoopController>());
257   GenericBaton b;
258   std::atomic<bool> threadWaiting(false);
259
260   auto thr = std::thread([&]() {
261     threadWaiting = true;
262     b.wait();
263     threadWaiting = false;
264   });
265
266   while (!threadWaiting) {
267   }
268   std::this_thread::sleep_for(std::chrono::milliseconds(300));
269
270   manager.addTask([&]() {
271     EXPECT_EQ(manager.hasActiveFiber(), true);
272     EXPECT_TRUE(threadWaiting);
273     b.post();
274     while (threadWaiting) {
275     }
276   });
277
278   manager.loopUntilNoReady();
279   thr.join();
280 }
281
282 TEST(FiberManager, addTasksNoncopyable) {
283   std::vector<Promise<int>> pendingFibers;
284   bool taskAdded = false;
285
286   FiberManager manager(folly::make_unique<SimpleLoopController>());
287   auto& loopController =
288       dynamic_cast<SimpleLoopController&>(manager.loopController());
289
290   auto loopFunc = [&]() {
291     if (!taskAdded) {
292       manager.addTask([&]() {
293         std::vector<std::function<std::unique_ptr<int>()>> funcs;
294         for (size_t i = 0; i < 3; ++i) {
295           funcs.push_back([i, &pendingFibers]() {
296             await([&pendingFibers](Promise<int> promise) {
297               pendingFibers.push_back(std::move(promise));
298             });
299             return folly::make_unique<int>(i * 2 + 1);
300           });
301         }
302
303         auto iter = addTasks(funcs.begin(), funcs.end());
304
305         size_t n = 0;
306         while (iter.hasNext()) {
307           auto result = iter.awaitNext();
308           EXPECT_EQ(2 * iter.getTaskID() + 1, *result);
309           EXPECT_GE(2 - n, pendingFibers.size());
310           ++n;
311         }
312         EXPECT_EQ(3, n);
313       });
314       taskAdded = true;
315     } else if (pendingFibers.size()) {
316       pendingFibers.back().setValue(0);
317       pendingFibers.pop_back();
318     } else {
319       loopController.stop();
320     }
321   };
322
323   loopController.loop(std::move(loopFunc));
324 }
325
326 TEST(FiberManager, awaitThrow) {
327   folly::EventBase evb;
328   struct ExpectedException {};
329   getFiberManager(evb)
330       .addTaskFuture([&] {
331         EXPECT_THROW(
332           await([](Promise<int> p) {
333               p.setValue(42);
334               throw ExpectedException();
335             }),
336           ExpectedException
337         );
338
339         EXPECT_THROW(
340           await([&](Promise<int> p) {
341               evb.runInEventBaseThread([p = std::move(p)]() mutable {
342                   p.setValue(42);
343                 });
344               throw ExpectedException();
345             }),
346           ExpectedException);
347       })
348       .waitVia(&evb);
349 }
350
351 TEST(FiberManager, addTasksThrow) {
352   std::vector<Promise<int>> pendingFibers;
353   bool taskAdded = false;
354
355   FiberManager manager(folly::make_unique<SimpleLoopController>());
356   auto& loopController =
357       dynamic_cast<SimpleLoopController&>(manager.loopController());
358
359   auto loopFunc = [&]() {
360     if (!taskAdded) {
361       manager.addTask([&]() {
362         std::vector<std::function<int()>> funcs;
363         for (size_t i = 0; i < 3; ++i) {
364           funcs.push_back([i, &pendingFibers]() {
365             await([&pendingFibers](Promise<int> promise) {
366               pendingFibers.push_back(std::move(promise));
367             });
368             if (i % 2 == 0) {
369               throw std::runtime_error("Runtime");
370             }
371             return i * 2 + 1;
372           });
373         }
374
375         auto iter = addTasks(funcs.begin(), funcs.end());
376
377         size_t n = 0;
378         while (iter.hasNext()) {
379           try {
380             int result = iter.awaitNext();
381             EXPECT_EQ(1, iter.getTaskID() % 2);
382             EXPECT_EQ(2 * iter.getTaskID() + 1, result);
383           } catch (...) {
384             EXPECT_EQ(0, iter.getTaskID() % 2);
385           }
386           EXPECT_GE(2 - n, pendingFibers.size());
387           ++n;
388         }
389         EXPECT_EQ(3, n);
390       });
391       taskAdded = true;
392     } else if (pendingFibers.size()) {
393       pendingFibers.back().setValue(0);
394       pendingFibers.pop_back();
395     } else {
396       loopController.stop();
397     }
398   };
399
400   loopController.loop(std::move(loopFunc));
401 }
402
403 TEST(FiberManager, addTasksVoid) {
404   std::vector<Promise<int>> pendingFibers;
405   bool taskAdded = false;
406
407   FiberManager manager(folly::make_unique<SimpleLoopController>());
408   auto& loopController =
409       dynamic_cast<SimpleLoopController&>(manager.loopController());
410
411   auto loopFunc = [&]() {
412     if (!taskAdded) {
413       manager.addTask([&]() {
414         std::vector<std::function<void()>> funcs;
415         for (size_t i = 0; i < 3; ++i) {
416           funcs.push_back([i, &pendingFibers]() {
417             await([&pendingFibers](Promise<int> promise) {
418               pendingFibers.push_back(std::move(promise));
419             });
420           });
421         }
422
423         auto iter = addTasks(funcs.begin(), funcs.end());
424
425         size_t n = 0;
426         while (iter.hasNext()) {
427           iter.awaitNext();
428           EXPECT_GE(2 - n, pendingFibers.size());
429           ++n;
430         }
431         EXPECT_EQ(3, n);
432       });
433       taskAdded = true;
434     } else if (pendingFibers.size()) {
435       pendingFibers.back().setValue(0);
436       pendingFibers.pop_back();
437     } else {
438       loopController.stop();
439     }
440   };
441
442   loopController.loop(std::move(loopFunc));
443 }
444
445 TEST(FiberManager, addTasksVoidThrow) {
446   std::vector<Promise<int>> pendingFibers;
447   bool taskAdded = false;
448
449   FiberManager manager(folly::make_unique<SimpleLoopController>());
450   auto& loopController =
451       dynamic_cast<SimpleLoopController&>(manager.loopController());
452
453   auto loopFunc = [&]() {
454     if (!taskAdded) {
455       manager.addTask([&]() {
456         std::vector<std::function<void()>> funcs;
457         for (size_t i = 0; i < 3; ++i) {
458           funcs.push_back([i, &pendingFibers]() {
459             await([&pendingFibers](Promise<int> promise) {
460               pendingFibers.push_back(std::move(promise));
461             });
462             if (i % 2 == 0) {
463               throw std::runtime_error("");
464             }
465           });
466         }
467
468         auto iter = addTasks(funcs.begin(), funcs.end());
469
470         size_t n = 0;
471         while (iter.hasNext()) {
472           try {
473             iter.awaitNext();
474             EXPECT_EQ(1, iter.getTaskID() % 2);
475           } catch (...) {
476             EXPECT_EQ(0, iter.getTaskID() % 2);
477           }
478           EXPECT_GE(2 - n, pendingFibers.size());
479           ++n;
480         }
481         EXPECT_EQ(3, n);
482       });
483       taskAdded = true;
484     } else if (pendingFibers.size()) {
485       pendingFibers.back().setValue(0);
486       pendingFibers.pop_back();
487     } else {
488       loopController.stop();
489     }
490   };
491
492   loopController.loop(std::move(loopFunc));
493 }
494
495 TEST(FiberManager, addTasksReserve) {
496   std::vector<Promise<int>> pendingFibers;
497   bool taskAdded = false;
498
499   FiberManager manager(folly::make_unique<SimpleLoopController>());
500   auto& loopController =
501       dynamic_cast<SimpleLoopController&>(manager.loopController());
502
503   auto loopFunc = [&]() {
504     if (!taskAdded) {
505       manager.addTask([&]() {
506         std::vector<std::function<void()>> funcs;
507         for (size_t i = 0; i < 3; ++i) {
508           funcs.push_back([&pendingFibers]() {
509             await([&pendingFibers](Promise<int> promise) {
510               pendingFibers.push_back(std::move(promise));
511             });
512           });
513         }
514
515         auto iter = addTasks(funcs.begin(), funcs.end());
516
517         iter.reserve(2);
518         EXPECT_TRUE(iter.hasCompleted());
519         EXPECT_TRUE(iter.hasPending());
520         EXPECT_TRUE(iter.hasNext());
521
522         iter.awaitNext();
523         EXPECT_TRUE(iter.hasCompleted());
524         EXPECT_TRUE(iter.hasPending());
525         EXPECT_TRUE(iter.hasNext());
526
527         iter.awaitNext();
528         EXPECT_FALSE(iter.hasCompleted());
529         EXPECT_TRUE(iter.hasPending());
530         EXPECT_TRUE(iter.hasNext());
531
532         iter.awaitNext();
533         EXPECT_FALSE(iter.hasCompleted());
534         EXPECT_FALSE(iter.hasPending());
535         EXPECT_FALSE(iter.hasNext());
536       });
537       taskAdded = true;
538     } else if (pendingFibers.size()) {
539       pendingFibers.back().setValue(0);
540       pendingFibers.pop_back();
541     } else {
542       loopController.stop();
543     }
544   };
545
546   loopController.loop(std::move(loopFunc));
547 }
548
549 TEST(FiberManager, addTaskDynamic) {
550   folly::EventBase evb;
551
552   Baton batons[3];
553
554   auto makeTask = [&](size_t taskId) {
555     return [&, taskId]() -> size_t {
556       batons[taskId].wait();
557       return taskId;
558     };
559   };
560
561   getFiberManager(evb)
562       .addTaskFuture([&]() {
563         TaskIterator<size_t> iterator;
564
565         iterator.addTask(makeTask(0));
566         iterator.addTask(makeTask(1));
567
568         batons[1].post();
569
570         EXPECT_EQ(1, iterator.awaitNext());
571
572         iterator.addTask(makeTask(2));
573
574         batons[2].post();
575
576         EXPECT_EQ(2, iterator.awaitNext());
577
578         batons[0].post();
579
580         EXPECT_EQ(0, iterator.awaitNext());
581       })
582       .waitVia(&evb);
583 }
584
585 TEST(FiberManager, forEach) {
586   std::vector<Promise<int>> pendingFibers;
587   bool taskAdded = false;
588
589   FiberManager manager(folly::make_unique<SimpleLoopController>());
590   auto& loopController =
591       dynamic_cast<SimpleLoopController&>(manager.loopController());
592
593   auto loopFunc = [&]() {
594     if (!taskAdded) {
595       manager.addTask([&]() {
596         std::vector<std::function<int()>> funcs;
597         for (size_t i = 0; i < 3; ++i) {
598           funcs.push_back([i, &pendingFibers]() {
599             await([&pendingFibers](Promise<int> promise) {
600               pendingFibers.push_back(std::move(promise));
601             });
602             return i * 2 + 1;
603           });
604         }
605
606         std::vector<std::pair<size_t, int>> results;
607         forEach(funcs.begin(), funcs.end(), [&results](size_t id, int result) {
608           results.emplace_back(id, result);
609         });
610         EXPECT_EQ(3, results.size());
611         EXPECT_TRUE(pendingFibers.empty());
612         for (size_t i = 0; i < 3; ++i) {
613           EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
614         }
615       });
616       taskAdded = true;
617     } else if (pendingFibers.size()) {
618       pendingFibers.back().setValue(0);
619       pendingFibers.pop_back();
620     } else {
621       loopController.stop();
622     }
623   };
624
625   loopController.loop(std::move(loopFunc));
626 }
627
628 TEST(FiberManager, collectN) {
629   std::vector<Promise<int>> pendingFibers;
630   bool taskAdded = false;
631
632   FiberManager manager(folly::make_unique<SimpleLoopController>());
633   auto& loopController =
634       dynamic_cast<SimpleLoopController&>(manager.loopController());
635
636   auto loopFunc = [&]() {
637     if (!taskAdded) {
638       manager.addTask([&]() {
639         std::vector<std::function<int()>> funcs;
640         for (size_t i = 0; i < 3; ++i) {
641           funcs.push_back([i, &pendingFibers]() {
642             await([&pendingFibers](Promise<int> promise) {
643               pendingFibers.push_back(std::move(promise));
644             });
645             return i * 2 + 1;
646           });
647         }
648
649         auto results = collectN(funcs.begin(), funcs.end(), 2);
650         EXPECT_EQ(2, results.size());
651         EXPECT_EQ(1, pendingFibers.size());
652         for (size_t i = 0; i < 2; ++i) {
653           EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
654         }
655       });
656       taskAdded = true;
657     } else if (pendingFibers.size()) {
658       pendingFibers.back().setValue(0);
659       pendingFibers.pop_back();
660     } else {
661       loopController.stop();
662     }
663   };
664
665   loopController.loop(std::move(loopFunc));
666 }
667
668 TEST(FiberManager, collectNThrow) {
669   std::vector<Promise<int>> pendingFibers;
670   bool taskAdded = false;
671
672   FiberManager manager(folly::make_unique<SimpleLoopController>());
673   auto& loopController =
674       dynamic_cast<SimpleLoopController&>(manager.loopController());
675
676   auto loopFunc = [&]() {
677     if (!taskAdded) {
678       manager.addTask([&]() {
679         std::vector<std::function<int()>> funcs;
680         for (size_t i = 0; i < 3; ++i) {
681           funcs.push_back([i, &pendingFibers]() {
682             await([&pendingFibers](Promise<int> promise) {
683               pendingFibers.push_back(std::move(promise));
684             });
685             throw std::runtime_error("Runtime");
686             return i * 2 + 1;
687           });
688         }
689
690         try {
691           collectN(funcs.begin(), funcs.end(), 2);
692         } catch (...) {
693           EXPECT_EQ(1, pendingFibers.size());
694         }
695       });
696       taskAdded = true;
697     } else if (pendingFibers.size()) {
698       pendingFibers.back().setValue(0);
699       pendingFibers.pop_back();
700     } else {
701       loopController.stop();
702     }
703   };
704
705   loopController.loop(std::move(loopFunc));
706 }
707
708 TEST(FiberManager, collectNVoid) {
709   std::vector<Promise<int>> pendingFibers;
710   bool taskAdded = false;
711
712   FiberManager manager(folly::make_unique<SimpleLoopController>());
713   auto& loopController =
714       dynamic_cast<SimpleLoopController&>(manager.loopController());
715
716   auto loopFunc = [&]() {
717     if (!taskAdded) {
718       manager.addTask([&]() {
719         std::vector<std::function<void()>> funcs;
720         for (size_t i = 0; i < 3; ++i) {
721           funcs.push_back([i, &pendingFibers]() {
722             await([&pendingFibers](Promise<int> promise) {
723               pendingFibers.push_back(std::move(promise));
724             });
725           });
726         }
727
728         auto results = collectN(funcs.begin(), funcs.end(), 2);
729         EXPECT_EQ(2, results.size());
730         EXPECT_EQ(1, pendingFibers.size());
731       });
732       taskAdded = true;
733     } else if (pendingFibers.size()) {
734       pendingFibers.back().setValue(0);
735       pendingFibers.pop_back();
736     } else {
737       loopController.stop();
738     }
739   };
740
741   loopController.loop(std::move(loopFunc));
742 }
743
744 TEST(FiberManager, collectNVoidThrow) {
745   std::vector<Promise<int>> pendingFibers;
746   bool taskAdded = false;
747
748   FiberManager manager(folly::make_unique<SimpleLoopController>());
749   auto& loopController =
750       dynamic_cast<SimpleLoopController&>(manager.loopController());
751
752   auto loopFunc = [&]() {
753     if (!taskAdded) {
754       manager.addTask([&]() {
755         std::vector<std::function<void()>> funcs;
756         for (size_t i = 0; i < 3; ++i) {
757           funcs.push_back([i, &pendingFibers]() {
758             await([&pendingFibers](Promise<int> promise) {
759               pendingFibers.push_back(std::move(promise));
760             });
761             throw std::runtime_error("Runtime");
762           });
763         }
764
765         try {
766           collectN(funcs.begin(), funcs.end(), 2);
767         } catch (...) {
768           EXPECT_EQ(1, pendingFibers.size());
769         }
770       });
771       taskAdded = true;
772     } else if (pendingFibers.size()) {
773       pendingFibers.back().setValue(0);
774       pendingFibers.pop_back();
775     } else {
776       loopController.stop();
777     }
778   };
779
780   loopController.loop(std::move(loopFunc));
781 }
782
783 TEST(FiberManager, collectAll) {
784   std::vector<Promise<int>> pendingFibers;
785   bool taskAdded = false;
786
787   FiberManager manager(folly::make_unique<SimpleLoopController>());
788   auto& loopController =
789       dynamic_cast<SimpleLoopController&>(manager.loopController());
790
791   auto loopFunc = [&]() {
792     if (!taskAdded) {
793       manager.addTask([&]() {
794         std::vector<std::function<int()>> funcs;
795         for (size_t i = 0; i < 3; ++i) {
796           funcs.push_back([i, &pendingFibers]() {
797             await([&pendingFibers](Promise<int> promise) {
798               pendingFibers.push_back(std::move(promise));
799             });
800             return i * 2 + 1;
801           });
802         }
803
804         auto results = collectAll(funcs.begin(), funcs.end());
805         EXPECT_TRUE(pendingFibers.empty());
806         for (size_t i = 0; i < 3; ++i) {
807           EXPECT_EQ(i * 2 + 1, results[i]);
808         }
809       });
810       taskAdded = true;
811     } else if (pendingFibers.size()) {
812       pendingFibers.back().setValue(0);
813       pendingFibers.pop_back();
814     } else {
815       loopController.stop();
816     }
817   };
818
819   loopController.loop(std::move(loopFunc));
820 }
821
822 TEST(FiberManager, collectAllVoid) {
823   std::vector<Promise<int>> pendingFibers;
824   bool taskAdded = false;
825
826   FiberManager manager(folly::make_unique<SimpleLoopController>());
827   auto& loopController =
828       dynamic_cast<SimpleLoopController&>(manager.loopController());
829
830   auto loopFunc = [&]() {
831     if (!taskAdded) {
832       manager.addTask([&]() {
833         std::vector<std::function<void()>> funcs;
834         for (size_t i = 0; i < 3; ++i) {
835           funcs.push_back([i, &pendingFibers]() {
836             await([&pendingFibers](Promise<int> promise) {
837               pendingFibers.push_back(std::move(promise));
838             });
839           });
840         }
841
842         collectAll(funcs.begin(), funcs.end());
843         EXPECT_TRUE(pendingFibers.empty());
844       });
845       taskAdded = true;
846     } else if (pendingFibers.size()) {
847       pendingFibers.back().setValue(0);
848       pendingFibers.pop_back();
849     } else {
850       loopController.stop();
851     }
852   };
853
854   loopController.loop(std::move(loopFunc));
855 }
856
857 TEST(FiberManager, collectAny) {
858   std::vector<Promise<int>> pendingFibers;
859   bool taskAdded = false;
860
861   FiberManager manager(folly::make_unique<SimpleLoopController>());
862   auto& loopController =
863       dynamic_cast<SimpleLoopController&>(manager.loopController());
864
865   auto loopFunc = [&]() {
866     if (!taskAdded) {
867       manager.addTask([&]() {
868         std::vector<std::function<int()>> funcs;
869         for (size_t i = 0; i < 3; ++i) {
870           funcs.push_back([i, &pendingFibers]() {
871             await([&pendingFibers](Promise<int> promise) {
872               pendingFibers.push_back(std::move(promise));
873             });
874             if (i == 1) {
875               throw std::runtime_error("This exception will be ignored");
876             }
877             return i * 2 + 1;
878           });
879         }
880
881         auto result = collectAny(funcs.begin(), funcs.end());
882         EXPECT_EQ(2, pendingFibers.size());
883         EXPECT_EQ(2, result.first);
884         EXPECT_EQ(2 * 2 + 1, result.second);
885       });
886       taskAdded = true;
887     } else if (pendingFibers.size()) {
888       pendingFibers.back().setValue(0);
889       pendingFibers.pop_back();
890     } else {
891       loopController.stop();
892     }
893   };
894
895   loopController.loop(std::move(loopFunc));
896 }
897
898 namespace {
899 /* Checks that this function was run from a main context,
900    by comparing an address on a stack to a known main stack address
901    and a known related fiber stack address.  The assumption
902    is that fiber stack and main stack will be far enough apart,
903    while any two values on the same stack will be close. */
904 void expectMainContext(bool& ran, int* mainLocation, int* fiberLocation) {
905   int here;
906   /* 2 pages is a good guess */
907   constexpr ssize_t DISTANCE = 0x2000 / sizeof(int);
908   if (fiberLocation) {
909     EXPECT_TRUE(std::abs(&here - fiberLocation) > DISTANCE);
910   }
911   if (mainLocation) {
912     EXPECT_TRUE(std::abs(&here - mainLocation) < DISTANCE);
913   }
914
915   EXPECT_FALSE(ran);
916   ran = true;
917 }
918 }
919
920 TEST(FiberManager, runInMainContext) {
921   FiberManager manager(folly::make_unique<SimpleLoopController>());
922   auto& loopController =
923       dynamic_cast<SimpleLoopController&>(manager.loopController());
924
925   bool checkRan = false;
926
927   int mainLocation;
928   manager.runInMainContext(
929       [&]() { expectMainContext(checkRan, &mainLocation, nullptr); });
930   EXPECT_TRUE(checkRan);
931
932   checkRan = false;
933
934   manager.addTask([&]() {
935     struct A {
936       explicit A(int value_) : value(value_) {}
937       A(const A&) = delete;
938       A(A&&) = default;
939
940       int value;
941     };
942     int stackLocation;
943     auto ret = runInMainContext([&]() {
944       expectMainContext(checkRan, &mainLocation, &stackLocation);
945       return A(42);
946     });
947     EXPECT_TRUE(checkRan);
948     EXPECT_EQ(42, ret.value);
949   });
950
951   loopController.loop([&]() { loopController.stop(); });
952
953   EXPECT_TRUE(checkRan);
954 }
955
956 TEST(FiberManager, addTaskFinally) {
957   FiberManager manager(folly::make_unique<SimpleLoopController>());
958   auto& loopController =
959       dynamic_cast<SimpleLoopController&>(manager.loopController());
960
961   bool checkRan = false;
962
963   int mainLocation;
964
965   manager.addTaskFinally(
966       [&]() { return 1234; },
967       [&](Try<int>&& result) {
968         EXPECT_EQ(result.value(), 1234);
969
970         expectMainContext(checkRan, &mainLocation, nullptr);
971       });
972
973   EXPECT_FALSE(checkRan);
974
975   loopController.loop([&]() { loopController.stop(); });
976
977   EXPECT_TRUE(checkRan);
978 }
979
980 TEST(FiberManager, fibersPoolWithinLimit) {
981   FiberManager::Options opts;
982   opts.maxFibersPoolSize = 5;
983
984   FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
985   auto& loopController =
986       dynamic_cast<SimpleLoopController&>(manager.loopController());
987
988   size_t fibersRun = 0;
989
990   for (size_t i = 0; i < 5; ++i) {
991     manager.addTask([&]() { ++fibersRun; });
992   }
993   loopController.loop([&]() { loopController.stop(); });
994
995   EXPECT_EQ(5, fibersRun);
996   EXPECT_EQ(5, manager.fibersAllocated());
997   EXPECT_EQ(5, manager.fibersPoolSize());
998
999   for (size_t i = 0; i < 5; ++i) {
1000     manager.addTask([&]() { ++fibersRun; });
1001   }
1002   loopController.loop([&]() { loopController.stop(); });
1003
1004   EXPECT_EQ(10, fibersRun);
1005   EXPECT_EQ(5, manager.fibersAllocated());
1006   EXPECT_EQ(5, manager.fibersPoolSize());
1007 }
1008
1009 TEST(FiberManager, fibersPoolOverLimit) {
1010   FiberManager::Options opts;
1011   opts.maxFibersPoolSize = 5;
1012
1013   FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
1014   auto& loopController =
1015       dynamic_cast<SimpleLoopController&>(manager.loopController());
1016
1017   size_t fibersRun = 0;
1018
1019   for (size_t i = 0; i < 10; ++i) {
1020     manager.addTask([&]() { ++fibersRun; });
1021   }
1022
1023   EXPECT_EQ(0, fibersRun);
1024   EXPECT_EQ(10, manager.fibersAllocated());
1025   EXPECT_EQ(0, manager.fibersPoolSize());
1026
1027   loopController.loop([&]() { loopController.stop(); });
1028
1029   EXPECT_EQ(10, fibersRun);
1030   EXPECT_EQ(5, manager.fibersAllocated());
1031   EXPECT_EQ(5, manager.fibersPoolSize());
1032 }
1033
1034 TEST(FiberManager, remoteFiberBasic) {
1035   FiberManager manager(folly::make_unique<SimpleLoopController>());
1036   auto& loopController =
1037       dynamic_cast<SimpleLoopController&>(manager.loopController());
1038
1039   int result[2];
1040   result[0] = result[1] = 0;
1041   folly::Optional<Promise<int>> savedPromise[2];
1042   manager.addTask([&]() {
1043     result[0] = await(
1044         [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1045   });
1046   manager.addTask([&]() {
1047     result[1] = await(
1048         [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1049   });
1050
1051   manager.loopUntilNoReady();
1052
1053   EXPECT_TRUE(savedPromise[0].hasValue());
1054   EXPECT_TRUE(savedPromise[1].hasValue());
1055   EXPECT_EQ(0, result[0]);
1056   EXPECT_EQ(0, result[1]);
1057
1058   std::thread remoteThread0{[&]() { savedPromise[0]->setValue(42); }};
1059   std::thread remoteThread1{[&]() { savedPromise[1]->setValue(43); }};
1060   remoteThread0.join();
1061   remoteThread1.join();
1062   EXPECT_EQ(0, result[0]);
1063   EXPECT_EQ(0, result[1]);
1064   /* Should only have scheduled once */
1065   EXPECT_EQ(1, loopController.remoteScheduleCalled());
1066
1067   manager.loopUntilNoReady();
1068   EXPECT_EQ(42, result[0]);
1069   EXPECT_EQ(43, result[1]);
1070 }
1071
1072 TEST(FiberManager, addTaskRemoteBasic) {
1073   FiberManager manager(folly::make_unique<SimpleLoopController>());
1074
1075   int result[2];
1076   result[0] = result[1] = 0;
1077   folly::Optional<Promise<int>> savedPromise[2];
1078
1079   std::thread remoteThread0{[&]() {
1080     manager.addTaskRemote([&]() {
1081       result[0] = await(
1082           [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1083     });
1084   }};
1085   std::thread remoteThread1{[&]() {
1086     manager.addTaskRemote([&]() {
1087       result[1] = await(
1088           [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1089     });
1090   }};
1091   remoteThread0.join();
1092   remoteThread1.join();
1093
1094   manager.loopUntilNoReady();
1095
1096   EXPECT_TRUE(savedPromise[0].hasValue());
1097   EXPECT_TRUE(savedPromise[1].hasValue());
1098   EXPECT_EQ(0, result[0]);
1099   EXPECT_EQ(0, result[1]);
1100
1101   savedPromise[0]->setValue(42);
1102   savedPromise[1]->setValue(43);
1103
1104   EXPECT_EQ(0, result[0]);
1105   EXPECT_EQ(0, result[1]);
1106
1107   manager.loopUntilNoReady();
1108   EXPECT_EQ(42, result[0]);
1109   EXPECT_EQ(43, result[1]);
1110 }
1111
1112 TEST(FiberManager, remoteHasTasks) {
1113   size_t counter = 0;
1114   FiberManager fm(folly::make_unique<SimpleLoopController>());
1115   std::thread remote([&]() { fm.addTaskRemote([&]() { ++counter; }); });
1116
1117   remote.join();
1118
1119   while (fm.hasTasks()) {
1120     fm.loopUntilNoReady();
1121   }
1122
1123   EXPECT_FALSE(fm.hasTasks());
1124   EXPECT_EQ(counter, 1);
1125 }
1126
1127 TEST(FiberManager, remoteHasReadyTasks) {
1128   int result = 0;
1129   folly::Optional<Promise<int>> savedPromise;
1130   FiberManager fm(folly::make_unique<SimpleLoopController>());
1131   std::thread remote([&]() {
1132     fm.addTaskRemote([&]() {
1133       result = await(
1134           [&](Promise<int> promise) { savedPromise = std::move(promise); });
1135       EXPECT_TRUE(fm.hasTasks());
1136     });
1137   });
1138
1139   remote.join();
1140   EXPECT_TRUE(fm.hasTasks());
1141
1142   fm.loopUntilNoReady();
1143   EXPECT_TRUE(fm.hasTasks());
1144
1145   std::thread remote2([&]() { savedPromise->setValue(47); });
1146   remote2.join();
1147   EXPECT_TRUE(fm.hasTasks());
1148
1149   fm.loopUntilNoReady();
1150   EXPECT_FALSE(fm.hasTasks());
1151
1152   EXPECT_EQ(result, 47);
1153 }
1154
1155 template <typename Data>
1156 void testFiberLocal() {
1157   FiberManager fm(
1158       LocalType<Data>(), folly::make_unique<SimpleLoopController>());
1159
1160   fm.addTask([]() {
1161     EXPECT_EQ(42, local<Data>().value);
1162
1163     local<Data>().value = 43;
1164
1165     addTask([]() {
1166       EXPECT_EQ(43, local<Data>().value);
1167
1168       local<Data>().value = 44;
1169
1170       addTask([]() { EXPECT_EQ(44, local<Data>().value); });
1171     });
1172   });
1173
1174   fm.addTask([&]() {
1175     EXPECT_EQ(42, local<Data>().value);
1176
1177     local<Data>().value = 43;
1178
1179     fm.addTaskRemote([]() { EXPECT_EQ(43, local<Data>().value); });
1180   });
1181
1182   fm.addTask([]() {
1183     EXPECT_EQ(42, local<Data>().value);
1184     local<Data>().value = 43;
1185
1186     auto task = []() {
1187       EXPECT_EQ(43, local<Data>().value);
1188       local<Data>().value = 44;
1189     };
1190     std::vector<std::function<void()>> tasks{task};
1191     collectAny(tasks.begin(), tasks.end());
1192
1193     EXPECT_EQ(43, local<Data>().value);
1194   });
1195
1196   fm.loopUntilNoReady();
1197   EXPECT_FALSE(fm.hasTasks());
1198 }
1199
1200 TEST(FiberManager, fiberLocal) {
1201   struct SimpleData {
1202     int value{42};
1203   };
1204
1205   testFiberLocal<SimpleData>();
1206 }
1207
1208 TEST(FiberManager, fiberLocalHeap) {
1209   struct LargeData {
1210     char _[1024 * 1024];
1211     int value{42};
1212   };
1213
1214   testFiberLocal<LargeData>();
1215 }
1216
1217 TEST(FiberManager, fiberLocalDestructor) {
1218   struct CrazyData {
1219     size_t data{42};
1220
1221     ~CrazyData() {
1222       if (data == 41) {
1223         addTask([]() {
1224           EXPECT_EQ(42, local<CrazyData>().data);
1225           // Make sure we don't have infinite loop
1226           local<CrazyData>().data = 0;
1227         });
1228       }
1229     }
1230   };
1231
1232   FiberManager fm(
1233       LocalType<CrazyData>(), folly::make_unique<SimpleLoopController>());
1234
1235   fm.addTask([]() { local<CrazyData>().data = 41; });
1236
1237   fm.loopUntilNoReady();
1238   EXPECT_FALSE(fm.hasTasks());
1239 }
1240
1241 TEST(FiberManager, yieldTest) {
1242   FiberManager manager(folly::make_unique<SimpleLoopController>());
1243   auto& loopController =
1244       dynamic_cast<SimpleLoopController&>(manager.loopController());
1245
1246   bool checkRan = false;
1247
1248   manager.addTask([&]() {
1249     manager.yield();
1250     checkRan = true;
1251   });
1252
1253   loopController.loop([&]() {
1254     if (checkRan) {
1255       loopController.stop();
1256     }
1257   });
1258
1259   EXPECT_TRUE(checkRan);
1260 }
1261
1262 TEST(FiberManager, RequestContext) {
1263   FiberManager fm(folly::make_unique<SimpleLoopController>());
1264
1265   bool checkRun1 = false;
1266   bool checkRun2 = false;
1267   bool checkRun3 = false;
1268   bool checkRun4 = false;
1269   folly::fibers::Baton baton1;
1270   folly::fibers::Baton baton2;
1271   folly::fibers::Baton baton3;
1272   folly::fibers::Baton baton4;
1273
1274   {
1275     folly::RequestContextScopeGuard rctx;
1276     auto rcontext1 = folly::RequestContext::get();
1277     fm.addTask([&, rcontext1]() {
1278       EXPECT_EQ(rcontext1, folly::RequestContext::get());
1279       baton1.wait(
1280           [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1281       EXPECT_EQ(rcontext1, folly::RequestContext::get());
1282       runInMainContext(
1283           [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1284       checkRun1 = true;
1285     });
1286   }
1287   {
1288     folly::RequestContextScopeGuard rctx;
1289     auto rcontext2 = folly::RequestContext::get();
1290     fm.addTaskRemote([&, rcontext2]() {
1291       EXPECT_EQ(rcontext2, folly::RequestContext::get());
1292       baton2.wait();
1293       EXPECT_EQ(rcontext2, folly::RequestContext::get());
1294       checkRun2 = true;
1295     });
1296   }
1297   {
1298     folly::RequestContextScopeGuard rctx;
1299     auto rcontext3 = folly::RequestContext::get();
1300     fm.addTaskFinally(
1301         [&, rcontext3]() {
1302           EXPECT_EQ(rcontext3, folly::RequestContext::get());
1303           baton3.wait();
1304           EXPECT_EQ(rcontext3, folly::RequestContext::get());
1305
1306           return folly::Unit();
1307         },
1308         [&, rcontext3](Try<folly::Unit>&& /* t */) {
1309           EXPECT_EQ(rcontext3, folly::RequestContext::get());
1310           checkRun3 = true;
1311         });
1312   }
1313   {
1314     folly::RequestContext::setContext(nullptr);
1315     fm.addTask([&]() {
1316       folly::RequestContextScopeGuard rctx;
1317       auto rcontext4 = folly::RequestContext::get();
1318       baton4.wait();
1319       EXPECT_EQ(rcontext4, folly::RequestContext::get());
1320       checkRun4 = true;
1321     });
1322   }
1323   {
1324     folly::RequestContextScopeGuard rctx;
1325     auto rcontext = folly::RequestContext::get();
1326
1327     fm.loopUntilNoReady();
1328     EXPECT_EQ(rcontext, folly::RequestContext::get());
1329
1330     baton1.post();
1331     EXPECT_EQ(rcontext, folly::RequestContext::get());
1332     fm.loopUntilNoReady();
1333     EXPECT_TRUE(checkRun1);
1334     EXPECT_EQ(rcontext, folly::RequestContext::get());
1335
1336     baton2.post();
1337     EXPECT_EQ(rcontext, folly::RequestContext::get());
1338     fm.loopUntilNoReady();
1339     EXPECT_TRUE(checkRun2);
1340     EXPECT_EQ(rcontext, folly::RequestContext::get());
1341
1342     baton3.post();
1343     EXPECT_EQ(rcontext, folly::RequestContext::get());
1344     fm.loopUntilNoReady();
1345     EXPECT_TRUE(checkRun3);
1346     EXPECT_EQ(rcontext, folly::RequestContext::get());
1347
1348     baton4.post();
1349     EXPECT_EQ(rcontext, folly::RequestContext::get());
1350     fm.loopUntilNoReady();
1351     EXPECT_TRUE(checkRun4);
1352     EXPECT_EQ(rcontext, folly::RequestContext::get());
1353   }
1354 }
1355
1356 TEST(FiberManager, resizePeriodically) {
1357   FiberManager::Options opts;
1358   opts.fibersPoolResizePeriodMs = 300;
1359   opts.maxFibersPoolSize = 5;
1360
1361   FiberManager manager(folly::make_unique<EventBaseLoopController>(), opts);
1362
1363   folly::EventBase evb;
1364   dynamic_cast<EventBaseLoopController&>(manager.loopController())
1365       .attachEventBase(evb);
1366
1367   std::vector<Baton> batons(10);
1368
1369   size_t tasksRun = 0;
1370   for (size_t i = 0; i < 30; ++i) {
1371     manager.addTask([i, &batons, &tasksRun]() {
1372       ++tasksRun;
1373       // Keep some fibers active indefinitely
1374       if (i < batons.size()) {
1375         batons[i].wait();
1376       }
1377     });
1378   }
1379
1380   EXPECT_EQ(0, tasksRun);
1381   EXPECT_EQ(30, manager.fibersAllocated());
1382   EXPECT_EQ(0, manager.fibersPoolSize());
1383
1384   evb.loopOnce();
1385   EXPECT_EQ(30, tasksRun);
1386   EXPECT_EQ(30, manager.fibersAllocated());
1387   // Can go over maxFibersPoolSize, 10 of 30 fibers still active
1388   EXPECT_EQ(20, manager.fibersPoolSize());
1389
1390   std::this_thread::sleep_for(std::chrono::milliseconds(400));
1391   evb.loopOnce(); // no fibers active in this period
1392   EXPECT_EQ(30, manager.fibersAllocated());
1393   EXPECT_EQ(20, manager.fibersPoolSize());
1394
1395   std::this_thread::sleep_for(std::chrono::milliseconds(400));
1396   evb.loopOnce(); // should shrink fibers pool to maxFibersPoolSize
1397   EXPECT_EQ(15, manager.fibersAllocated());
1398   EXPECT_EQ(5, manager.fibersPoolSize());
1399
1400   for (size_t i = 0; i < batons.size(); ++i) {
1401     batons[i].post();
1402   }
1403   evb.loopOnce();
1404   EXPECT_EQ(15, manager.fibersAllocated());
1405   EXPECT_EQ(15, manager.fibersPoolSize());
1406
1407   std::this_thread::sleep_for(std::chrono::milliseconds(400));
1408   evb.loopOnce(); // 10 fibers active in last period
1409   EXPECT_EQ(10, manager.fibersAllocated());
1410   EXPECT_EQ(10, manager.fibersPoolSize());
1411
1412   std::this_thread::sleep_for(std::chrono::milliseconds(400));
1413   evb.loopOnce();
1414   EXPECT_EQ(5, manager.fibersAllocated());
1415   EXPECT_EQ(5, manager.fibersPoolSize());
1416 }
1417
1418 TEST(FiberManager, batonWaitTimeoutHandler) {
1419   FiberManager manager(folly::make_unique<EventBaseLoopController>());
1420
1421   folly::EventBase evb;
1422   dynamic_cast<EventBaseLoopController&>(manager.loopController())
1423       .attachEventBase(evb);
1424
1425   size_t fibersRun = 0;
1426   Baton baton;
1427   Baton::TimeoutHandler timeoutHandler;
1428
1429   manager.addTask([&]() {
1430     baton.wait(timeoutHandler);
1431     ++fibersRun;
1432   });
1433   manager.loopUntilNoReady();
1434
1435   EXPECT_FALSE(baton.try_wait());
1436   EXPECT_EQ(0, fibersRun);
1437
1438   timeoutHandler.scheduleTimeout(std::chrono::milliseconds(250));
1439   std::this_thread::sleep_for(std::chrono::milliseconds(500));
1440
1441   EXPECT_FALSE(baton.try_wait());
1442   EXPECT_EQ(0, fibersRun);
1443
1444   evb.loopOnce();
1445   manager.loopUntilNoReady();
1446
1447   EXPECT_EQ(1, fibersRun);
1448 }
1449
1450 TEST(FiberManager, batonWaitTimeoutMany) {
1451   FiberManager manager(folly::make_unique<EventBaseLoopController>());
1452
1453   folly::EventBase evb;
1454   dynamic_cast<EventBaseLoopController&>(manager.loopController())
1455       .attachEventBase(evb);
1456
1457   constexpr size_t kNumTimeoutTasks = 10000;
1458   size_t tasksCount = kNumTimeoutTasks;
1459
1460   // We add many tasks to hit timeout queue deallocation logic.
1461   for (size_t i = 0; i < kNumTimeoutTasks; ++i) {
1462     manager.addTask([&]() {
1463       Baton baton;
1464       Baton::TimeoutHandler timeoutHandler;
1465
1466       folly::fibers::addTask([&] {
1467         timeoutHandler.scheduleTimeout(std::chrono::milliseconds(1000));
1468       });
1469
1470       baton.wait(timeoutHandler);
1471       if (--tasksCount == 0) {
1472         evb.terminateLoopSoon();
1473       }
1474     });
1475   }
1476
1477   evb.loopForever();
1478 }
1479
1480 TEST(FiberManager, remoteFutureTest) {
1481   FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
1482   auto& loopController =
1483       dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1484
1485   int testValue1 = 5;
1486   int testValue2 = 7;
1487   auto f1 = fiberManager.addTaskFuture([&]() { return testValue1; });
1488   auto f2 = fiberManager.addTaskRemoteFuture([&]() { return testValue2; });
1489   loopController.loop([&]() { loopController.stop(); });
1490   auto v1 = f1.get();
1491   auto v2 = f2.get();
1492
1493   EXPECT_EQ(v1, testValue1);
1494   EXPECT_EQ(v2, testValue2);
1495 }
1496
1497 // Test that a void function produes a Future<Unit>.
1498 TEST(FiberManager, remoteFutureVoidUnitTest) {
1499   FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
1500   auto& loopController =
1501       dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1502
1503   bool ranLocal = false;
1504   folly::Future<folly::Unit> futureLocal =
1505       fiberManager.addTaskFuture([&]() { ranLocal = true; });
1506
1507   bool ranRemote = false;
1508   folly::Future<folly::Unit> futureRemote =
1509       fiberManager.addTaskRemoteFuture([&]() { ranRemote = true; });
1510
1511   loopController.loop([&]() { loopController.stop(); });
1512
1513   futureLocal.wait();
1514   ASSERT_TRUE(ranLocal);
1515
1516   futureRemote.wait();
1517   ASSERT_TRUE(ranRemote);
1518 }
1519
1520 TEST(FiberManager, nestedFiberManagers) {
1521   folly::EventBase outerEvb;
1522   folly::EventBase innerEvb;
1523
1524   getFiberManager(outerEvb).addTask([&]() {
1525     EXPECT_EQ(
1526         &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1527
1528     runInMainContext([&]() {
1529       getFiberManager(innerEvb).addTask([&]() {
1530         EXPECT_EQ(
1531             &getFiberManager(innerEvb), FiberManager::getFiberManagerUnsafe());
1532
1533         innerEvb.terminateLoopSoon();
1534       });
1535
1536       innerEvb.loopForever();
1537     });
1538
1539     EXPECT_EQ(
1540         &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1541
1542     outerEvb.terminateLoopSoon();
1543   });
1544
1545   outerEvb.loopForever();
1546 }
1547
1548 TEST(FiberManager, semaphore) {
1549   constexpr size_t kTasks = 10;
1550   constexpr size_t kIterations = 10000;
1551   constexpr size_t kNumTokens = 10;
1552
1553   Semaphore sem(kNumTokens);
1554   int counterA = 0;
1555   int counterB = 0;
1556
1557   auto task = [&sem, kTasks, kIterations, kNumTokens](
1558       int& counter, folly::fibers::Baton& baton) {
1559     FiberManager manager(folly::make_unique<EventBaseLoopController>());
1560     folly::EventBase evb;
1561     dynamic_cast<EventBaseLoopController&>(manager.loopController())
1562         .attachEventBase(evb);
1563
1564     {
1565       std::shared_ptr<folly::EventBase> completionCounter(
1566           &evb, [](folly::EventBase* evb) { evb->terminateLoopSoon(); });
1567
1568       for (size_t i = 0; i < kTasks; ++i) {
1569         manager.addTask([&, completionCounter]() {
1570           for (size_t j = 0; j < kIterations; ++j) {
1571             sem.wait();
1572             ++counter;
1573             sem.signal();
1574             --counter;
1575
1576             EXPECT_LT(counter, kNumTokens);
1577             EXPECT_GE(counter, 0);
1578           }
1579         });
1580       }
1581
1582       baton.wait();
1583     }
1584     evb.loopForever();
1585   };
1586
1587   folly::fibers::Baton batonA;
1588   folly::fibers::Baton batonB;
1589   std::thread threadA([&] { task(counterA, batonA); });
1590   std::thread threadB([&] { task(counterB, batonB); });
1591
1592   batonA.post();
1593   batonB.post();
1594   threadA.join();
1595   threadB.join();
1596
1597   EXPECT_LT(counterA, kNumTokens);
1598   EXPECT_LT(counterB, kNumTokens);
1599   EXPECT_GE(counterA, 0);
1600   EXPECT_GE(counterB, 0);
1601 }
1602
1603 template <typename ExecutorT>
1604 void singleBatchDispatch(ExecutorT& executor, int batchSize, int index) {
1605   thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
1606       executor, [=](std::vector<int>&& batch) {
1607         EXPECT_EQ(batchSize, batch.size());
1608         std::vector<std::string> results;
1609         for (auto& it : batch) {
1610           results.push_back(folly::to<std::string>(it));
1611         }
1612         return results;
1613       });
1614
1615   auto indexCopy = index;
1616   auto result = batchDispatcher.add(std::move(indexCopy));
1617   EXPECT_EQ(folly::to<std::string>(index), result.get());
1618 }
1619
1620 TEST(FiberManager, batchDispatchTest) {
1621   folly::EventBase evb;
1622   auto& executor = getFiberManager(evb);
1623
1624   // Launch multiple fibers with a single id.
1625   executor.add([&]() {
1626     int batchSize = 10;
1627     for (int i = 0; i < batchSize; i++) {
1628       executor.add(
1629           [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
1630     }
1631   });
1632   evb.loop();
1633
1634   // Reuse the same BatchDispatcher to batch once again.
1635   executor.add([&]() {
1636     int batchSize = 10;
1637     for (int i = 0; i < batchSize; i++) {
1638       executor.add(
1639           [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
1640     }
1641   });
1642   evb.loop();
1643 }
1644
1645 template <typename ExecutorT>
1646 folly::Future<std::vector<std::string>> doubleBatchInnerDispatch(
1647     ExecutorT& executor,
1648     int totalNumberOfElements,
1649     std::vector<int> input) {
1650   thread_local BatchDispatcher<
1651       std::vector<int>,
1652       std::vector<std::string>,
1653       ExecutorT>
1654   batchDispatcher(executor, [=](std::vector<std::vector<int>>&& batch) {
1655     std::vector<std::vector<std::string>> results;
1656     int numberOfElements = 0;
1657     for (auto& unit : batch) {
1658       numberOfElements += unit.size();
1659       std::vector<std::string> result;
1660       for (auto& element : unit) {
1661         result.push_back(folly::to<std::string>(element));
1662       }
1663       results.push_back(std::move(result));
1664     }
1665     EXPECT_EQ(totalNumberOfElements, numberOfElements);
1666     return results;
1667   });
1668
1669   return batchDispatcher.add(std::move(input));
1670 }
1671
1672 /**
1673  * Batch values in groups of 5, and then call inner dispatch.
1674  */
1675 template <typename ExecutorT>
1676 void doubleBatchOuterDispatch(
1677     ExecutorT& executor,
1678     int totalNumberOfElements,
1679     int index) {
1680   thread_local BatchDispatcher<int, std::string, ExecutorT>
1681   batchDispatcher(executor, [=, &executor](std::vector<int>&& batch) {
1682     EXPECT_EQ(totalNumberOfElements, batch.size());
1683     std::vector<std::string> results;
1684     std::vector<folly::Future<std::vector<std::string>>>
1685         innerDispatchResultFutures;
1686
1687     std::vector<int> group;
1688     for (auto unit : batch) {
1689       group.push_back(unit);
1690       if (group.size() == 5) {
1691         auto localGroup = group;
1692         group.clear();
1693
1694         innerDispatchResultFutures.push_back(doubleBatchInnerDispatch(
1695             executor, totalNumberOfElements, localGroup));
1696       }
1697     }
1698
1699     folly::collectAll(
1700         innerDispatchResultFutures.begin(), innerDispatchResultFutures.end())
1701         .then([&](
1702             std::vector<Try<std::vector<std::string>>> innerDispatchResults) {
1703           for (auto& unit : innerDispatchResults) {
1704             for (auto& element : unit.value()) {
1705               results.push_back(element);
1706             }
1707           }
1708         })
1709         .get();
1710     return results;
1711   });
1712
1713   auto indexCopy = index;
1714   auto result = batchDispatcher.add(std::move(indexCopy));
1715   EXPECT_EQ(folly::to<std::string>(index), result.get());
1716 }
1717
1718 TEST(FiberManager, doubleBatchDispatchTest) {
1719   folly::EventBase evb;
1720   auto& executor = getFiberManager(evb);
1721
1722   // Launch multiple fibers with a single id.
1723   executor.add([&]() {
1724     int totalNumberOfElements = 20;
1725     for (int i = 0; i < totalNumberOfElements; i++) {
1726       executor.add([=, &executor]() {
1727         doubleBatchOuterDispatch(executor, totalNumberOfElements, i);
1728       });
1729     }
1730   });
1731   evb.loop();
1732 }
1733
1734 template <typename ExecutorT>
1735 void batchDispatchExceptionHandling(ExecutorT& executor, int i) {
1736   thread_local BatchDispatcher<int, int, ExecutorT> batchDispatcher(
1737       executor, [=, &executor](std::vector<int> &&) -> std::vector<int> {
1738         throw std::runtime_error("Surprise!!");
1739       });
1740
1741   EXPECT_THROW(batchDispatcher.add(i).get(), std::runtime_error);
1742 }
1743
1744 TEST(FiberManager, batchDispatchExceptionHandlingTest) {
1745   folly::EventBase evb;
1746   auto& executor = getFiberManager(evb);
1747
1748   // Launch multiple fibers with a single id.
1749   executor.add([&]() {
1750     int totalNumberOfElements = 5;
1751     for (int i = 0; i < totalNumberOfElements; i++) {
1752       executor.add(
1753           [=, &executor]() { batchDispatchExceptionHandling(executor, i); });
1754     }
1755   });
1756   evb.loop();
1757 }
1758
1759 namespace AtomicBatchDispatcherTesting {
1760
1761 using ValueT = size_t;
1762 using ResultT = std::string;
1763 using DispatchFunctionT =
1764     folly::Function<std::vector<ResultT>(std::vector<ValueT>&&)>;
1765
1766 #define ENABLE_TRACE_IN_TEST 0 // Set to 1 to debug issues in ABD tests
1767 #if ENABLE_TRACE_IN_TEST
1768 #define OUTPUT_TRACE std::cerr
1769 #else // ENABLE_TRACE_IN_TEST
1770 struct DevNullPiper {
1771   template <typename T>
1772   DevNullPiper& operator<<(const T&) {
1773     return *this;
1774   }
1775
1776   DevNullPiper& operator<<(std::ostream& (*)(std::ostream&)) {
1777     return *this;
1778   }
1779 } devNullPiper;
1780 #define OUTPUT_TRACE devNullPiper
1781 #endif // ENABLE_TRACE_IN_TEST
1782
1783 struct Job {
1784   AtomicBatchDispatcher<ValueT, ResultT>::Token token;
1785   ValueT input;
1786
1787   void preprocess(FiberManager& executor, bool die) {
1788     // Yield for a random duration [0, 10] ms to simulate I/O in preprocessing
1789     clock_t msecToDoIO = folly::Random::rand32() % 10;
1790     double start = (1000.0 * clock()) / CLOCKS_PER_SEC;
1791     double endAfter = start + msecToDoIO;
1792     while ((1000.0 * clock()) / CLOCKS_PER_SEC < endAfter) {
1793       executor.yield();
1794     }
1795     if (die) {
1796       throw std::logic_error("Simulating preprocessing failure");
1797     }
1798   }
1799
1800   Job(AtomicBatchDispatcher<ValueT, ResultT>::Token&& t, ValueT i)
1801       : token(std::move(t)), input(i) {}
1802
1803   Job(Job&&) = default;
1804   Job& operator=(Job&&) = default;
1805 };
1806
1807 ResultT processSingleInput(ValueT&& input) {
1808   return folly::to<ResultT>(std::move(input));
1809 }
1810
1811 std::vector<ResultT> userDispatchFunc(std::vector<ValueT>&& inputs) {
1812   size_t expectedCount = inputs.size();
1813   std::vector<ResultT> results;
1814   results.reserve(expectedCount);
1815   for (size_t i = 0; i < expectedCount; ++i) {
1816     results.emplace_back(processSingleInput(std::move(inputs[i])));
1817   }
1818   return results;
1819 }
1820
1821 void createJobs(
1822     AtomicBatchDispatcher<ValueT, ResultT>& atomicBatchDispatcher,
1823     std::vector<Job>& jobs,
1824     size_t count) {
1825   jobs.clear();
1826   for (size_t i = 0; i < count; ++i) {
1827     jobs.emplace_back(Job(atomicBatchDispatcher.getToken(), i));
1828   }
1829 }
1830
1831 enum class DispatchProblem {
1832   None,
1833   PreprocessThrows,
1834   DuplicateDispatch,
1835 };
1836
1837 void dispatchJobs(
1838     FiberManager& executor,
1839     std::vector<Job>& jobs,
1840     std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1841     DispatchProblem dispatchProblem = DispatchProblem::None,
1842     size_t problemIndex = size_t(-1)) {
1843   EXPECT_TRUE(
1844       dispatchProblem == DispatchProblem::None || problemIndex < jobs.size());
1845   results.clear();
1846   results.resize(jobs.size());
1847   for (size_t i = 0; i < jobs.size(); ++i) {
1848     executor.add(
1849         [i, &executor, &jobs, &results, dispatchProblem, problemIndex]() {
1850           try {
1851             Job job(std::move(jobs[i]));
1852
1853             if (dispatchProblem == DispatchProblem::PreprocessThrows) {
1854               if (i == problemIndex) {
1855                 EXPECT_THROW(job.preprocess(executor, true), std::logic_error);
1856                 return;
1857               }
1858             }
1859
1860             job.preprocess(executor, false);
1861             OUTPUT_TRACE << "Dispatching job #" << i << std::endl;
1862             results[i] = job.token.dispatch(job.input);
1863             OUTPUT_TRACE << "Result future filled for job #" << i << std::endl;
1864
1865             if (dispatchProblem == DispatchProblem::DuplicateDispatch) {
1866               if (i == problemIndex) {
1867                 EXPECT_THROW(job.token.dispatch(job.input), std::logic_error);
1868               }
1869             }
1870           } catch (...) {
1871             OUTPUT_TRACE << "Preprocessing failed for job #" << i << std::endl;
1872           }
1873         });
1874   }
1875 }
1876
1877 void validateResult(
1878     std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1879     size_t i) {
1880   try {
1881     OUTPUT_TRACE << "results[" << i << "].value() : " << results[i]->value()
1882                  << std::endl;
1883   } catch (std::exception& e) {
1884     OUTPUT_TRACE << "Exception : " << e.what() << std::endl;
1885     throw;
1886   }
1887 }
1888
1889 template <typename TException>
1890 void validateResults(
1891     std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1892     size_t expectedNumResults) {
1893   size_t numResultsFilled = 0;
1894   for (size_t i = 0; i < results.size(); ++i) {
1895     if (!results[i]) {
1896       continue;
1897     }
1898     ++numResultsFilled;
1899     EXPECT_THROW(validateResult(results, i), TException);
1900   }
1901   EXPECT_EQ(numResultsFilled, expectedNumResults);
1902 }
1903
1904 void validateResults(
1905     std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1906     size_t expectedNumResults) {
1907   size_t numResultsFilled = 0;
1908   for (size_t i = 0; i < results.size(); ++i) {
1909     if (!results[i]) {
1910       continue;
1911     }
1912     ++numResultsFilled;
1913     EXPECT_NO_THROW(validateResult(results, i));
1914     ValueT expectedInput = i;
1915     EXPECT_EQ(
1916         results[i]->value(), processSingleInput(std::move(expectedInput)));
1917   }
1918   EXPECT_EQ(numResultsFilled, expectedNumResults);
1919 }
1920
1921 } // AtomicBatchDispatcherTesting
1922
1923 #define SET_UP_TEST_FUNC                                        \
1924   using namespace AtomicBatchDispatcherTesting;                 \
1925   folly::EventBase evb;                                         \
1926   auto& executor = getFiberManager(evb);                        \
1927   const size_t COUNT = 11;                                      \
1928   std::vector<Job> jobs;                                        \
1929   jobs.reserve(COUNT);                                          \
1930   std::vector<folly::Optional<folly::Future<ResultT>>> results; \
1931   results.reserve(COUNT);                                       \
1932   DispatchFunctionT dispatchFunc
1933
1934 TEST(FiberManager, ABD_Test) {
1935   SET_UP_TEST_FUNC;
1936
1937   //
1938   // Testing AtomicBatchDispatcher with explicit call to commit()
1939   //
1940   dispatchFunc = userDispatchFunc;
1941   auto atomicBatchDispatcher =
1942       createAtomicBatchDispatcher(std::move(dispatchFunc));
1943   createJobs(atomicBatchDispatcher, jobs, COUNT);
1944   dispatchJobs(executor, jobs, results);
1945   atomicBatchDispatcher.commit();
1946   evb.loop();
1947   validateResults(results, COUNT);
1948 }
1949
1950 TEST(FiberManager, ABD_DispatcherDestroyedBeforeCallingCommit) {
1951   SET_UP_TEST_FUNC;
1952
1953   //
1954   // Testing AtomicBatchDispatcher destroyed before calling commit.
1955   // Handles error cases for:
1956   // - User might have forgotten to add the call to commit() in the code
1957   // - An unexpected exception got thrown in user code before commit() is called
1958   //
1959   try {
1960     dispatchFunc = userDispatchFunc;
1961     auto atomicBatchDispatcher =
1962         createAtomicBatchDispatcher(std::move(dispatchFunc));
1963     createJobs(atomicBatchDispatcher, jobs, COUNT);
1964     dispatchJobs(executor, jobs, results);
1965     throw std::runtime_error(
1966         "Unexpected exception in user code before commit called");
1967     atomicBatchDispatcher.commit();
1968   } catch (...) {
1969     /* User code handles the exception and does not exit process */
1970   }
1971   evb.loop();
1972   validateResults<std::logic_error>(results, COUNT);
1973 }
1974
1975 TEST(FiberManager, ABD_PreprocessingFailureTest) {
1976   SET_UP_TEST_FUNC;
1977
1978   //
1979   // Testing preprocessing failure on a job throws
1980   //
1981   dispatchFunc = userDispatchFunc;
1982   auto atomicBatchDispatcher =
1983       createAtomicBatchDispatcher(std::move(dispatchFunc));
1984   createJobs(atomicBatchDispatcher, jobs, COUNT);
1985   dispatchJobs(executor, jobs, results, DispatchProblem::PreprocessThrows, 8);
1986   atomicBatchDispatcher.commit();
1987   evb.loop();
1988   validateResults<std::logic_error>(results, COUNT - 1);
1989 }
1990
1991 TEST(FiberManager, ABD_MultipleDispatchOnSameTokenErrorTest) {
1992   SET_UP_TEST_FUNC;
1993
1994   //
1995   // Testing that calling dispatch more than once on the same token throws
1996   //
1997   dispatchFunc = userDispatchFunc;
1998   auto atomicBatchDispatcher =
1999       createAtomicBatchDispatcher(std::move(dispatchFunc));
2000   createJobs(atomicBatchDispatcher, jobs, COUNT);
2001   dispatchJobs(executor, jobs, results, DispatchProblem::DuplicateDispatch, 4);
2002   atomicBatchDispatcher.commit();
2003   evb.loop();
2004 }
2005
2006 TEST(FiberManager, ABD_GetTokenCalledAfterCommitTest) {
2007   SET_UP_TEST_FUNC;
2008
2009   //
2010   // Testing that exception set on attempt to call getToken after commit called
2011   //
2012   dispatchFunc = userDispatchFunc;
2013   auto atomicBatchDispatcher =
2014       createAtomicBatchDispatcher(std::move(dispatchFunc));
2015   createJobs(atomicBatchDispatcher, jobs, COUNT);
2016   atomicBatchDispatcher.commit();
2017   EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
2018   dispatchJobs(executor, jobs, results);
2019   EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
2020   evb.loop();
2021   validateResults(results, COUNT);
2022   EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
2023 }
2024
2025 TEST(FiberManager, ABD_UserProvidedBatchDispatchThrowsTest) {
2026   SET_UP_TEST_FUNC;
2027
2028   //
2029   // Testing that exception is set if user provided batch dispatch throws
2030   //
2031   dispatchFunc = [](std::vector<ValueT>&& inputs) -> std::vector<ResultT> {
2032     auto results = userDispatchFunc(std::move(inputs));
2033     throw std::runtime_error("Unexpected exception in user dispatch function");
2034     return results;
2035   };
2036   auto atomicBatchDispatcher =
2037       createAtomicBatchDispatcher(std::move(dispatchFunc));
2038   createJobs(atomicBatchDispatcher, jobs, COUNT);
2039   dispatchJobs(executor, jobs, results);
2040   atomicBatchDispatcher.commit();
2041   evb.loop();
2042   validateResults<std::runtime_error>(results, COUNT);
2043 }
2044
2045 TEST(FiberManager, VirtualEventBase) {
2046   folly::ScopedEventBaseThread thread;
2047
2048   auto evb1 =
2049       folly::make_unique<folly::VirtualEventBase>(*thread.getEventBase());
2050   auto evb2 =
2051       folly::make_unique<folly::VirtualEventBase>(*thread.getEventBase());
2052
2053   bool done1{false};
2054   bool done2{false};
2055
2056   getFiberManager(*evb1).addTaskRemote([&] {
2057     Baton baton;
2058     baton.timed_wait(std::chrono::milliseconds{100});
2059
2060     done1 = true;
2061   });
2062
2063   getFiberManager(*evb2).addTaskRemote([&] {
2064     Baton baton;
2065     baton.timed_wait(std::chrono::milliseconds{200});
2066
2067     done2 = true;
2068   });
2069
2070   evb1.reset();
2071   EXPECT_TRUE(done1);
2072
2073   evb2.reset();
2074   EXPECT_TRUE(done2);
2075 }
2076
2077 /**
2078  * Test that we can properly track fiber stack usage.
2079  *
2080  * This functionality can only be enabled when ASAN is disabled, so avoid
2081  * running this test with ASAN.
2082  */
2083 #ifndef FOLLY_SANITIZE_ADDRESS
2084 TEST(FiberManager, recordStack) {
2085   std::thread([] {
2086     folly::fibers::FiberManager::Options opts;
2087     opts.recordStackEvery = 1;
2088
2089     FiberManager fm(folly::make_unique<SimpleLoopController>(), opts);
2090     auto& loopController =
2091         dynamic_cast<SimpleLoopController&>(fm.loopController());
2092
2093     static constexpr size_t n = 1000;
2094     int s = 0;
2095     fm.addTask([&]() {
2096       int b[n] = {0};
2097       for (size_t i = 0; i < n; ++i) {
2098         b[i] = i;
2099       }
2100       for (size_t i = 0; i + 1 < n; ++i) {
2101         s += b[i] * b[i + 1];
2102       }
2103     });
2104
2105     (void)s;
2106
2107     loopController.loop([&]() { loopController.stop(); });
2108
2109     // Check that we properly accounted fiber stack usage.
2110     EXPECT_LT(n * sizeof(int), fm.stackHighWatermark());
2111   }).join();
2112 }
2113 #endif