Revert D4310312: [Folly] Enable -Wunreachable-code
[folly.git] / folly / fibers / test / FibersTest.cpp
1 /*
2  * Copyright 2016 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include <atomic>
17 #include <thread>
18 #include <vector>
19
20 #include <folly/Memory.h>
21 #include <folly/Random.h>
22 #include <folly/futures/Future.h>
23
24 #include <folly/Conv.h>
25 #include <folly/fibers/AddTasks.h>
26 #include <folly/fibers/AtomicBatchDispatcher.h>
27 #include <folly/fibers/BatchDispatcher.h>
28 #include <folly/fibers/EventBaseLoopController.h>
29 #include <folly/fibers/FiberManager.h>
30 #include <folly/fibers/FiberManagerMap.h>
31 #include <folly/fibers/GenericBaton.h>
32 #include <folly/fibers/Semaphore.h>
33 #include <folly/fibers/SimpleLoopController.h>
34 #include <folly/fibers/WhenN.h>
35 #include <folly/io/async/ScopedEventBaseThread.h>
36 #include <folly/portability/GTest.h>
37
38 using namespace folly::fibers;
39
40 using folly::Try;
41
42 TEST(FiberManager, batonTimedWaitTimeout) {
43   bool taskAdded = false;
44   size_t iterations = 0;
45
46   FiberManager manager(folly::make_unique<SimpleLoopController>());
47   auto& loopController =
48       dynamic_cast<SimpleLoopController&>(manager.loopController());
49
50   auto loopFunc = [&]() {
51     if (!taskAdded) {
52       manager.addTask([&]() {
53         Baton baton;
54
55         auto res = baton.timed_wait(std::chrono::milliseconds(230));
56
57         EXPECT_FALSE(res);
58         EXPECT_EQ(5, iterations);
59
60         loopController.stop();
61       });
62       manager.addTask([&]() {
63         Baton baton;
64
65         auto res = baton.timed_wait(std::chrono::milliseconds(130));
66
67         EXPECT_FALSE(res);
68         EXPECT_EQ(3, iterations);
69
70         loopController.stop();
71       });
72       taskAdded = true;
73     } else {
74       std::this_thread::sleep_for(std::chrono::milliseconds(50));
75       iterations++;
76     }
77   };
78
79   loopController.loop(std::move(loopFunc));
80 }
81
82 TEST(FiberManager, batonTimedWaitPost) {
83   bool taskAdded = false;
84   size_t iterations = 0;
85   Baton* baton_ptr;
86
87   FiberManager manager(folly::make_unique<SimpleLoopController>());
88   auto& loopController =
89       dynamic_cast<SimpleLoopController&>(manager.loopController());
90
91   auto loopFunc = [&]() {
92     if (!taskAdded) {
93       manager.addTask([&]() {
94         Baton baton;
95         baton_ptr = &baton;
96
97         auto res = baton.timed_wait(std::chrono::milliseconds(130));
98
99         EXPECT_TRUE(res);
100         EXPECT_EQ(2, iterations);
101
102         loopController.stop();
103       });
104       taskAdded = true;
105     } else {
106       std::this_thread::sleep_for(std::chrono::milliseconds(50));
107       iterations++;
108       if (iterations == 2) {
109         baton_ptr->post();
110       }
111     }
112   };
113
114   loopController.loop(std::move(loopFunc));
115 }
116
117 TEST(FiberManager, batonTimedWaitTimeoutEvb) {
118   size_t tasksComplete = 0;
119
120   folly::EventBase evb;
121
122   FiberManager manager(folly::make_unique<EventBaseLoopController>());
123   dynamic_cast<EventBaseLoopController&>(manager.loopController())
124       .attachEventBase(evb);
125
126   auto task = [&](size_t timeout_ms) {
127     Baton baton;
128
129     auto start = EventBaseLoopController::Clock::now();
130     auto res = baton.timed_wait(std::chrono::milliseconds(timeout_ms));
131     auto finish = EventBaseLoopController::Clock::now();
132
133     EXPECT_FALSE(res);
134
135     auto duration_ms =
136         std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
137
138     EXPECT_GT(duration_ms.count(), timeout_ms - 50);
139     EXPECT_LT(duration_ms.count(), timeout_ms + 50);
140
141     if (++tasksComplete == 2) {
142       evb.terminateLoopSoon();
143     }
144   };
145
146   evb.runInEventBaseThread([&]() {
147     manager.addTask([&]() { task(500); });
148     manager.addTask([&]() { task(250); });
149   });
150
151   evb.loopForever();
152
153   EXPECT_EQ(2, tasksComplete);
154 }
155
156 TEST(FiberManager, batonTimedWaitPostEvb) {
157   size_t tasksComplete = 0;
158
159   folly::EventBase evb;
160
161   FiberManager manager(folly::make_unique<EventBaseLoopController>());
162   dynamic_cast<EventBaseLoopController&>(manager.loopController())
163       .attachEventBase(evb);
164
165   evb.runInEventBaseThread([&]() {
166     manager.addTask([&]() {
167       Baton baton;
168
169       evb.tryRunAfterDelay([&]() { baton.post(); }, 100);
170
171       auto start = EventBaseLoopController::Clock::now();
172       auto res = baton.timed_wait(std::chrono::milliseconds(130));
173       auto finish = EventBaseLoopController::Clock::now();
174
175       EXPECT_TRUE(res);
176
177       auto duration_ms =
178           std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
179
180       EXPECT_TRUE(duration_ms.count() > 95 && duration_ms.count() < 110);
181
182       if (++tasksComplete == 1) {
183         evb.terminateLoopSoon();
184       }
185     });
186   });
187
188   evb.loopForever();
189
190   EXPECT_EQ(1, tasksComplete);
191 }
192
193 TEST(FiberManager, batonTryWait) {
194   FiberManager manager(folly::make_unique<SimpleLoopController>());
195
196   // Check if try_wait and post work as expected
197   Baton b;
198
199   manager.addTask([&]() {
200     while (!b.try_wait()) {
201     }
202   });
203   auto thr = std::thread([&]() {
204     std::this_thread::sleep_for(std::chrono::milliseconds(300));
205     b.post();
206   });
207
208   manager.loopUntilNoReady();
209   thr.join();
210
211   Baton c;
212
213   // Check try_wait without post
214   manager.addTask([&]() {
215     int cnt = 100;
216     while (cnt && !c.try_wait()) {
217       cnt--;
218     }
219     EXPECT_TRUE(!c.try_wait()); // must still hold
220     EXPECT_EQ(cnt, 0);
221   });
222
223   manager.loopUntilNoReady();
224 }
225
226 TEST(FiberManager, genericBatonFiberWait) {
227   FiberManager manager(folly::make_unique<SimpleLoopController>());
228
229   GenericBaton b;
230   bool fiberRunning = false;
231
232   manager.addTask([&]() {
233     EXPECT_EQ(manager.hasActiveFiber(), true);
234     fiberRunning = true;
235     b.wait();
236     fiberRunning = false;
237   });
238
239   EXPECT_FALSE(fiberRunning);
240   manager.loopUntilNoReady();
241   EXPECT_TRUE(fiberRunning); // ensure fiber still active
242
243   auto thr = std::thread([&]() {
244     std::this_thread::sleep_for(std::chrono::milliseconds(300));
245     b.post();
246   });
247
248   while (fiberRunning) {
249     manager.loopUntilNoReady();
250   }
251
252   thr.join();
253 }
254
255 TEST(FiberManager, genericBatonThreadWait) {
256   FiberManager manager(folly::make_unique<SimpleLoopController>());
257   GenericBaton b;
258   std::atomic<bool> threadWaiting(false);
259
260   auto thr = std::thread([&]() {
261     threadWaiting = true;
262     b.wait();
263     threadWaiting = false;
264   });
265
266   while (!threadWaiting) {
267   }
268   std::this_thread::sleep_for(std::chrono::milliseconds(300));
269
270   manager.addTask([&]() {
271     EXPECT_EQ(manager.hasActiveFiber(), true);
272     EXPECT_TRUE(threadWaiting);
273     b.post();
274     while (threadWaiting) {
275     }
276   });
277
278   manager.loopUntilNoReady();
279   thr.join();
280 }
281
282 TEST(FiberManager, addTasksNoncopyable) {
283   std::vector<Promise<int>> pendingFibers;
284   bool taskAdded = false;
285
286   FiberManager manager(folly::make_unique<SimpleLoopController>());
287   auto& loopController =
288       dynamic_cast<SimpleLoopController&>(manager.loopController());
289
290   auto loopFunc = [&]() {
291     if (!taskAdded) {
292       manager.addTask([&]() {
293         std::vector<std::function<std::unique_ptr<int>()>> funcs;
294         for (size_t i = 0; i < 3; ++i) {
295           funcs.push_back([i, &pendingFibers]() {
296             await([&pendingFibers](Promise<int> promise) {
297               pendingFibers.push_back(std::move(promise));
298             });
299             return folly::make_unique<int>(i * 2 + 1);
300           });
301         }
302
303         auto iter = addTasks(funcs.begin(), funcs.end());
304
305         size_t n = 0;
306         while (iter.hasNext()) {
307           auto result = iter.awaitNext();
308           EXPECT_EQ(2 * iter.getTaskID() + 1, *result);
309           EXPECT_GE(2 - n, pendingFibers.size());
310           ++n;
311         }
312         EXPECT_EQ(3, n);
313       });
314       taskAdded = true;
315     } else if (pendingFibers.size()) {
316       pendingFibers.back().setValue(0);
317       pendingFibers.pop_back();
318     } else {
319       loopController.stop();
320     }
321   };
322
323   loopController.loop(std::move(loopFunc));
324 }
325
326 TEST(FiberManager, awaitThrow) {
327   folly::EventBase evb;
328   struct ExpectedException {};
329   getFiberManager(evb)
330       .addTaskFuture([&] {
331         EXPECT_THROW(
332           await([](Promise<int> p) {
333               p.setValue(42);
334               throw ExpectedException();
335             }),
336           ExpectedException
337         );
338
339         EXPECT_THROW(
340           await([&](Promise<int> p) {
341               evb.runInEventBaseThread([p = std::move(p)]() mutable {
342                   p.setValue(42);
343                 });
344               throw ExpectedException();
345             }),
346           ExpectedException);
347       })
348       .waitVia(&evb);
349 }
350
351 TEST(FiberManager, addTasksThrow) {
352   std::vector<Promise<int>> pendingFibers;
353   bool taskAdded = false;
354
355   FiberManager manager(folly::make_unique<SimpleLoopController>());
356   auto& loopController =
357       dynamic_cast<SimpleLoopController&>(manager.loopController());
358
359   auto loopFunc = [&]() {
360     if (!taskAdded) {
361       manager.addTask([&]() {
362         std::vector<std::function<int()>> funcs;
363         for (size_t i = 0; i < 3; ++i) {
364           funcs.push_back([i, &pendingFibers]() {
365             await([&pendingFibers](Promise<int> promise) {
366               pendingFibers.push_back(std::move(promise));
367             });
368             if (i % 2 == 0) {
369               throw std::runtime_error("Runtime");
370             }
371             return i * 2 + 1;
372           });
373         }
374
375         auto iter = addTasks(funcs.begin(), funcs.end());
376
377         size_t n = 0;
378         while (iter.hasNext()) {
379           try {
380             int result = iter.awaitNext();
381             EXPECT_EQ(1, iter.getTaskID() % 2);
382             EXPECT_EQ(2 * iter.getTaskID() + 1, result);
383           } catch (...) {
384             EXPECT_EQ(0, iter.getTaskID() % 2);
385           }
386           EXPECT_GE(2 - n, pendingFibers.size());
387           ++n;
388         }
389         EXPECT_EQ(3, n);
390       });
391       taskAdded = true;
392     } else if (pendingFibers.size()) {
393       pendingFibers.back().setValue(0);
394       pendingFibers.pop_back();
395     } else {
396       loopController.stop();
397     }
398   };
399
400   loopController.loop(std::move(loopFunc));
401 }
402
403 TEST(FiberManager, addTasksVoid) {
404   std::vector<Promise<int>> pendingFibers;
405   bool taskAdded = false;
406
407   FiberManager manager(folly::make_unique<SimpleLoopController>());
408   auto& loopController =
409       dynamic_cast<SimpleLoopController&>(manager.loopController());
410
411   auto loopFunc = [&]() {
412     if (!taskAdded) {
413       manager.addTask([&]() {
414         std::vector<std::function<void()>> funcs;
415         for (size_t i = 0; i < 3; ++i) {
416           funcs.push_back([i, &pendingFibers]() {
417             await([&pendingFibers](Promise<int> promise) {
418               pendingFibers.push_back(std::move(promise));
419             });
420           });
421         }
422
423         auto iter = addTasks(funcs.begin(), funcs.end());
424
425         size_t n = 0;
426         while (iter.hasNext()) {
427           iter.awaitNext();
428           EXPECT_GE(2 - n, pendingFibers.size());
429           ++n;
430         }
431         EXPECT_EQ(3, n);
432       });
433       taskAdded = true;
434     } else if (pendingFibers.size()) {
435       pendingFibers.back().setValue(0);
436       pendingFibers.pop_back();
437     } else {
438       loopController.stop();
439     }
440   };
441
442   loopController.loop(std::move(loopFunc));
443 }
444
445 TEST(FiberManager, addTasksVoidThrow) {
446   std::vector<Promise<int>> pendingFibers;
447   bool taskAdded = false;
448
449   FiberManager manager(folly::make_unique<SimpleLoopController>());
450   auto& loopController =
451       dynamic_cast<SimpleLoopController&>(manager.loopController());
452
453   auto loopFunc = [&]() {
454     if (!taskAdded) {
455       manager.addTask([&]() {
456         std::vector<std::function<void()>> funcs;
457         for (size_t i = 0; i < 3; ++i) {
458           funcs.push_back([i, &pendingFibers]() {
459             await([&pendingFibers](Promise<int> promise) {
460               pendingFibers.push_back(std::move(promise));
461             });
462             if (i % 2 == 0) {
463               throw std::runtime_error("");
464             }
465           });
466         }
467
468         auto iter = addTasks(funcs.begin(), funcs.end());
469
470         size_t n = 0;
471         while (iter.hasNext()) {
472           try {
473             iter.awaitNext();
474             EXPECT_EQ(1, iter.getTaskID() % 2);
475           } catch (...) {
476             EXPECT_EQ(0, iter.getTaskID() % 2);
477           }
478           EXPECT_GE(2 - n, pendingFibers.size());
479           ++n;
480         }
481         EXPECT_EQ(3, n);
482       });
483       taskAdded = true;
484     } else if (pendingFibers.size()) {
485       pendingFibers.back().setValue(0);
486       pendingFibers.pop_back();
487     } else {
488       loopController.stop();
489     }
490   };
491
492   loopController.loop(std::move(loopFunc));
493 }
494
495 TEST(FiberManager, addTasksReserve) {
496   std::vector<Promise<int>> pendingFibers;
497   bool taskAdded = false;
498
499   FiberManager manager(folly::make_unique<SimpleLoopController>());
500   auto& loopController =
501       dynamic_cast<SimpleLoopController&>(manager.loopController());
502
503   auto loopFunc = [&]() {
504     if (!taskAdded) {
505       manager.addTask([&]() {
506         std::vector<std::function<void()>> funcs;
507         for (size_t i = 0; i < 3; ++i) {
508           funcs.push_back([&pendingFibers]() {
509             await([&pendingFibers](Promise<int> promise) {
510               pendingFibers.push_back(std::move(promise));
511             });
512           });
513         }
514
515         auto iter = addTasks(funcs.begin(), funcs.end());
516
517         iter.reserve(2);
518         EXPECT_TRUE(iter.hasCompleted());
519         EXPECT_TRUE(iter.hasPending());
520         EXPECT_TRUE(iter.hasNext());
521
522         iter.awaitNext();
523         EXPECT_TRUE(iter.hasCompleted());
524         EXPECT_TRUE(iter.hasPending());
525         EXPECT_TRUE(iter.hasNext());
526
527         iter.awaitNext();
528         EXPECT_FALSE(iter.hasCompleted());
529         EXPECT_TRUE(iter.hasPending());
530         EXPECT_TRUE(iter.hasNext());
531
532         iter.awaitNext();
533         EXPECT_FALSE(iter.hasCompleted());
534         EXPECT_FALSE(iter.hasPending());
535         EXPECT_FALSE(iter.hasNext());
536       });
537       taskAdded = true;
538     } else if (pendingFibers.size()) {
539       pendingFibers.back().setValue(0);
540       pendingFibers.pop_back();
541     } else {
542       loopController.stop();
543     }
544   };
545
546   loopController.loop(std::move(loopFunc));
547 }
548
549 TEST(FiberManager, addTaskDynamic) {
550   folly::EventBase evb;
551
552   Baton batons[3];
553
554   auto makeTask = [&](size_t taskId) {
555     return [&, taskId]() -> size_t {
556       batons[taskId].wait();
557       return taskId;
558     };
559   };
560
561   getFiberManager(evb)
562       .addTaskFuture([&]() {
563         TaskIterator<size_t> iterator;
564
565         iterator.addTask(makeTask(0));
566         iterator.addTask(makeTask(1));
567
568         batons[1].post();
569
570         EXPECT_EQ(1, iterator.awaitNext());
571
572         iterator.addTask(makeTask(2));
573
574         batons[2].post();
575
576         EXPECT_EQ(2, iterator.awaitNext());
577
578         batons[0].post();
579
580         EXPECT_EQ(0, iterator.awaitNext());
581       })
582       .waitVia(&evb);
583 }
584
585 TEST(FiberManager, forEach) {
586   std::vector<Promise<int>> pendingFibers;
587   bool taskAdded = false;
588
589   FiberManager manager(folly::make_unique<SimpleLoopController>());
590   auto& loopController =
591       dynamic_cast<SimpleLoopController&>(manager.loopController());
592
593   auto loopFunc = [&]() {
594     if (!taskAdded) {
595       manager.addTask([&]() {
596         std::vector<std::function<int()>> funcs;
597         for (size_t i = 0; i < 3; ++i) {
598           funcs.push_back([i, &pendingFibers]() {
599             await([&pendingFibers](Promise<int> promise) {
600               pendingFibers.push_back(std::move(promise));
601             });
602             return i * 2 + 1;
603           });
604         }
605
606         std::vector<std::pair<size_t, int>> results;
607         forEach(funcs.begin(), funcs.end(), [&results](size_t id, int result) {
608           results.emplace_back(id, result);
609         });
610         EXPECT_EQ(3, results.size());
611         EXPECT_TRUE(pendingFibers.empty());
612         for (size_t i = 0; i < 3; ++i) {
613           EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
614         }
615       });
616       taskAdded = true;
617     } else if (pendingFibers.size()) {
618       pendingFibers.back().setValue(0);
619       pendingFibers.pop_back();
620     } else {
621       loopController.stop();
622     }
623   };
624
625   loopController.loop(std::move(loopFunc));
626 }
627
628 TEST(FiberManager, collectN) {
629   std::vector<Promise<int>> pendingFibers;
630   bool taskAdded = false;
631
632   FiberManager manager(folly::make_unique<SimpleLoopController>());
633   auto& loopController =
634       dynamic_cast<SimpleLoopController&>(manager.loopController());
635
636   auto loopFunc = [&]() {
637     if (!taskAdded) {
638       manager.addTask([&]() {
639         std::vector<std::function<int()>> funcs;
640         for (size_t i = 0; i < 3; ++i) {
641           funcs.push_back([i, &pendingFibers]() {
642             await([&pendingFibers](Promise<int> promise) {
643               pendingFibers.push_back(std::move(promise));
644             });
645             return i * 2 + 1;
646           });
647         }
648
649         auto results = collectN(funcs.begin(), funcs.end(), 2);
650         EXPECT_EQ(2, results.size());
651         EXPECT_EQ(1, pendingFibers.size());
652         for (size_t i = 0; i < 2; ++i) {
653           EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
654         }
655       });
656       taskAdded = true;
657     } else if (pendingFibers.size()) {
658       pendingFibers.back().setValue(0);
659       pendingFibers.pop_back();
660     } else {
661       loopController.stop();
662     }
663   };
664
665   loopController.loop(std::move(loopFunc));
666 }
667
668 TEST(FiberManager, collectNThrow) {
669   std::vector<Promise<int>> pendingFibers;
670   bool taskAdded = false;
671
672   FiberManager manager(folly::make_unique<SimpleLoopController>());
673   auto& loopController =
674       dynamic_cast<SimpleLoopController&>(manager.loopController());
675
676   auto loopFunc = [&]() {
677     if (!taskAdded) {
678       manager.addTask([&]() {
679         std::vector<std::function<int()>> funcs;
680         for (size_t i = 0; i < 3; ++i) {
681           funcs.push_back([i, &pendingFibers]() -> size_t {
682             await([&pendingFibers](Promise<int> promise) {
683               pendingFibers.push_back(std::move(promise));
684             });
685             throw std::runtime_error("Runtime");
686           });
687         }
688
689         try {
690           collectN(funcs.begin(), funcs.end(), 2);
691         } catch (...) {
692           EXPECT_EQ(1, pendingFibers.size());
693         }
694       });
695       taskAdded = true;
696     } else if (pendingFibers.size()) {
697       pendingFibers.back().setValue(0);
698       pendingFibers.pop_back();
699     } else {
700       loopController.stop();
701     }
702   };
703
704   loopController.loop(std::move(loopFunc));
705 }
706
707 TEST(FiberManager, collectNVoid) {
708   std::vector<Promise<int>> pendingFibers;
709   bool taskAdded = false;
710
711   FiberManager manager(folly::make_unique<SimpleLoopController>());
712   auto& loopController =
713       dynamic_cast<SimpleLoopController&>(manager.loopController());
714
715   auto loopFunc = [&]() {
716     if (!taskAdded) {
717       manager.addTask([&]() {
718         std::vector<std::function<void()>> funcs;
719         for (size_t i = 0; i < 3; ++i) {
720           funcs.push_back([i, &pendingFibers]() {
721             await([&pendingFibers](Promise<int> promise) {
722               pendingFibers.push_back(std::move(promise));
723             });
724           });
725         }
726
727         auto results = collectN(funcs.begin(), funcs.end(), 2);
728         EXPECT_EQ(2, results.size());
729         EXPECT_EQ(1, pendingFibers.size());
730       });
731       taskAdded = true;
732     } else if (pendingFibers.size()) {
733       pendingFibers.back().setValue(0);
734       pendingFibers.pop_back();
735     } else {
736       loopController.stop();
737     }
738   };
739
740   loopController.loop(std::move(loopFunc));
741 }
742
743 TEST(FiberManager, collectNVoidThrow) {
744   std::vector<Promise<int>> pendingFibers;
745   bool taskAdded = false;
746
747   FiberManager manager(folly::make_unique<SimpleLoopController>());
748   auto& loopController =
749       dynamic_cast<SimpleLoopController&>(manager.loopController());
750
751   auto loopFunc = [&]() {
752     if (!taskAdded) {
753       manager.addTask([&]() {
754         std::vector<std::function<void()>> funcs;
755         for (size_t i = 0; i < 3; ++i) {
756           funcs.push_back([i, &pendingFibers]() {
757             await([&pendingFibers](Promise<int> promise) {
758               pendingFibers.push_back(std::move(promise));
759             });
760             throw std::runtime_error("Runtime");
761           });
762         }
763
764         try {
765           collectN(funcs.begin(), funcs.end(), 2);
766         } catch (...) {
767           EXPECT_EQ(1, pendingFibers.size());
768         }
769       });
770       taskAdded = true;
771     } else if (pendingFibers.size()) {
772       pendingFibers.back().setValue(0);
773       pendingFibers.pop_back();
774     } else {
775       loopController.stop();
776     }
777   };
778
779   loopController.loop(std::move(loopFunc));
780 }
781
782 TEST(FiberManager, collectAll) {
783   std::vector<Promise<int>> pendingFibers;
784   bool taskAdded = false;
785
786   FiberManager manager(folly::make_unique<SimpleLoopController>());
787   auto& loopController =
788       dynamic_cast<SimpleLoopController&>(manager.loopController());
789
790   auto loopFunc = [&]() {
791     if (!taskAdded) {
792       manager.addTask([&]() {
793         std::vector<std::function<int()>> funcs;
794         for (size_t i = 0; i < 3; ++i) {
795           funcs.push_back([i, &pendingFibers]() {
796             await([&pendingFibers](Promise<int> promise) {
797               pendingFibers.push_back(std::move(promise));
798             });
799             return i * 2 + 1;
800           });
801         }
802
803         auto results = collectAll(funcs.begin(), funcs.end());
804         EXPECT_TRUE(pendingFibers.empty());
805         for (size_t i = 0; i < 3; ++i) {
806           EXPECT_EQ(i * 2 + 1, results[i]);
807         }
808       });
809       taskAdded = true;
810     } else if (pendingFibers.size()) {
811       pendingFibers.back().setValue(0);
812       pendingFibers.pop_back();
813     } else {
814       loopController.stop();
815     }
816   };
817
818   loopController.loop(std::move(loopFunc));
819 }
820
821 TEST(FiberManager, collectAllVoid) {
822   std::vector<Promise<int>> pendingFibers;
823   bool taskAdded = false;
824
825   FiberManager manager(folly::make_unique<SimpleLoopController>());
826   auto& loopController =
827       dynamic_cast<SimpleLoopController&>(manager.loopController());
828
829   auto loopFunc = [&]() {
830     if (!taskAdded) {
831       manager.addTask([&]() {
832         std::vector<std::function<void()>> funcs;
833         for (size_t i = 0; i < 3; ++i) {
834           funcs.push_back([i, &pendingFibers]() {
835             await([&pendingFibers](Promise<int> promise) {
836               pendingFibers.push_back(std::move(promise));
837             });
838           });
839         }
840
841         collectAll(funcs.begin(), funcs.end());
842         EXPECT_TRUE(pendingFibers.empty());
843       });
844       taskAdded = true;
845     } else if (pendingFibers.size()) {
846       pendingFibers.back().setValue(0);
847       pendingFibers.pop_back();
848     } else {
849       loopController.stop();
850     }
851   };
852
853   loopController.loop(std::move(loopFunc));
854 }
855
856 TEST(FiberManager, collectAny) {
857   std::vector<Promise<int>> pendingFibers;
858   bool taskAdded = false;
859
860   FiberManager manager(folly::make_unique<SimpleLoopController>());
861   auto& loopController =
862       dynamic_cast<SimpleLoopController&>(manager.loopController());
863
864   auto loopFunc = [&]() {
865     if (!taskAdded) {
866       manager.addTask([&]() {
867         std::vector<std::function<int()>> funcs;
868         for (size_t i = 0; i < 3; ++i) {
869           funcs.push_back([i, &pendingFibers]() {
870             await([&pendingFibers](Promise<int> promise) {
871               pendingFibers.push_back(std::move(promise));
872             });
873             if (i == 1) {
874               throw std::runtime_error("This exception will be ignored");
875             }
876             return i * 2 + 1;
877           });
878         }
879
880         auto result = collectAny(funcs.begin(), funcs.end());
881         EXPECT_EQ(2, pendingFibers.size());
882         EXPECT_EQ(2, result.first);
883         EXPECT_EQ(2 * 2 + 1, result.second);
884       });
885       taskAdded = true;
886     } else if (pendingFibers.size()) {
887       pendingFibers.back().setValue(0);
888       pendingFibers.pop_back();
889     } else {
890       loopController.stop();
891     }
892   };
893
894   loopController.loop(std::move(loopFunc));
895 }
896
897 namespace {
898 /* Checks that this function was run from a main context,
899    by comparing an address on a stack to a known main stack address
900    and a known related fiber stack address.  The assumption
901    is that fiber stack and main stack will be far enough apart,
902    while any two values on the same stack will be close. */
903 void expectMainContext(bool& ran, int* mainLocation, int* fiberLocation) {
904   int here;
905   /* 2 pages is a good guess */
906   constexpr ssize_t DISTANCE = 0x2000 / sizeof(int);
907   if (fiberLocation) {
908     EXPECT_TRUE(std::abs(&here - fiberLocation) > DISTANCE);
909   }
910   if (mainLocation) {
911     EXPECT_TRUE(std::abs(&here - mainLocation) < DISTANCE);
912   }
913
914   EXPECT_FALSE(ran);
915   ran = true;
916 }
917 }
918
919 TEST(FiberManager, runInMainContext) {
920   FiberManager manager(folly::make_unique<SimpleLoopController>());
921   auto& loopController =
922       dynamic_cast<SimpleLoopController&>(manager.loopController());
923
924   bool checkRan = false;
925
926   int mainLocation;
927   manager.runInMainContext(
928       [&]() { expectMainContext(checkRan, &mainLocation, nullptr); });
929   EXPECT_TRUE(checkRan);
930
931   checkRan = false;
932
933   manager.addTask([&]() {
934     struct A {
935       explicit A(int value_) : value(value_) {}
936       A(const A&) = delete;
937       A(A&&) = default;
938
939       int value;
940     };
941     int stackLocation;
942     auto ret = runInMainContext([&]() {
943       expectMainContext(checkRan, &mainLocation, &stackLocation);
944       return A(42);
945     });
946     EXPECT_TRUE(checkRan);
947     EXPECT_EQ(42, ret.value);
948   });
949
950   loopController.loop([&]() { loopController.stop(); });
951
952   EXPECT_TRUE(checkRan);
953 }
954
955 TEST(FiberManager, addTaskFinally) {
956   FiberManager manager(folly::make_unique<SimpleLoopController>());
957   auto& loopController =
958       dynamic_cast<SimpleLoopController&>(manager.loopController());
959
960   bool checkRan = false;
961
962   int mainLocation;
963
964   manager.addTaskFinally(
965       [&]() { return 1234; },
966       [&](Try<int>&& result) {
967         EXPECT_EQ(result.value(), 1234);
968
969         expectMainContext(checkRan, &mainLocation, nullptr);
970       });
971
972   EXPECT_FALSE(checkRan);
973
974   loopController.loop([&]() { loopController.stop(); });
975
976   EXPECT_TRUE(checkRan);
977 }
978
979 TEST(FiberManager, fibersPoolWithinLimit) {
980   FiberManager::Options opts;
981   opts.maxFibersPoolSize = 5;
982
983   FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
984   auto& loopController =
985       dynamic_cast<SimpleLoopController&>(manager.loopController());
986
987   size_t fibersRun = 0;
988
989   for (size_t i = 0; i < 5; ++i) {
990     manager.addTask([&]() { ++fibersRun; });
991   }
992   loopController.loop([&]() { loopController.stop(); });
993
994   EXPECT_EQ(5, fibersRun);
995   EXPECT_EQ(5, manager.fibersAllocated());
996   EXPECT_EQ(5, manager.fibersPoolSize());
997
998   for (size_t i = 0; i < 5; ++i) {
999     manager.addTask([&]() { ++fibersRun; });
1000   }
1001   loopController.loop([&]() { loopController.stop(); });
1002
1003   EXPECT_EQ(10, fibersRun);
1004   EXPECT_EQ(5, manager.fibersAllocated());
1005   EXPECT_EQ(5, manager.fibersPoolSize());
1006 }
1007
1008 TEST(FiberManager, fibersPoolOverLimit) {
1009   FiberManager::Options opts;
1010   opts.maxFibersPoolSize = 5;
1011
1012   FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
1013   auto& loopController =
1014       dynamic_cast<SimpleLoopController&>(manager.loopController());
1015
1016   size_t fibersRun = 0;
1017
1018   for (size_t i = 0; i < 10; ++i) {
1019     manager.addTask([&]() { ++fibersRun; });
1020   }
1021
1022   EXPECT_EQ(0, fibersRun);
1023   EXPECT_EQ(10, manager.fibersAllocated());
1024   EXPECT_EQ(0, manager.fibersPoolSize());
1025
1026   loopController.loop([&]() { loopController.stop(); });
1027
1028   EXPECT_EQ(10, fibersRun);
1029   EXPECT_EQ(5, manager.fibersAllocated());
1030   EXPECT_EQ(5, manager.fibersPoolSize());
1031 }
1032
1033 TEST(FiberManager, remoteFiberBasic) {
1034   FiberManager manager(folly::make_unique<SimpleLoopController>());
1035   auto& loopController =
1036       dynamic_cast<SimpleLoopController&>(manager.loopController());
1037
1038   int result[2];
1039   result[0] = result[1] = 0;
1040   folly::Optional<Promise<int>> savedPromise[2];
1041   manager.addTask([&]() {
1042     result[0] = await(
1043         [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1044   });
1045   manager.addTask([&]() {
1046     result[1] = await(
1047         [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1048   });
1049
1050   manager.loopUntilNoReady();
1051
1052   EXPECT_TRUE(savedPromise[0].hasValue());
1053   EXPECT_TRUE(savedPromise[1].hasValue());
1054   EXPECT_EQ(0, result[0]);
1055   EXPECT_EQ(0, result[1]);
1056
1057   std::thread remoteThread0{[&]() { savedPromise[0]->setValue(42); }};
1058   std::thread remoteThread1{[&]() { savedPromise[1]->setValue(43); }};
1059   remoteThread0.join();
1060   remoteThread1.join();
1061   EXPECT_EQ(0, result[0]);
1062   EXPECT_EQ(0, result[1]);
1063   /* Should only have scheduled once */
1064   EXPECT_EQ(1, loopController.remoteScheduleCalled());
1065
1066   manager.loopUntilNoReady();
1067   EXPECT_EQ(42, result[0]);
1068   EXPECT_EQ(43, result[1]);
1069 }
1070
1071 TEST(FiberManager, addTaskRemoteBasic) {
1072   FiberManager manager(folly::make_unique<SimpleLoopController>());
1073
1074   int result[2];
1075   result[0] = result[1] = 0;
1076   folly::Optional<Promise<int>> savedPromise[2];
1077
1078   std::thread remoteThread0{[&]() {
1079     manager.addTaskRemote([&]() {
1080       result[0] = await(
1081           [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1082     });
1083   }};
1084   std::thread remoteThread1{[&]() {
1085     manager.addTaskRemote([&]() {
1086       result[1] = await(
1087           [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1088     });
1089   }};
1090   remoteThread0.join();
1091   remoteThread1.join();
1092
1093   manager.loopUntilNoReady();
1094
1095   EXPECT_TRUE(savedPromise[0].hasValue());
1096   EXPECT_TRUE(savedPromise[1].hasValue());
1097   EXPECT_EQ(0, result[0]);
1098   EXPECT_EQ(0, result[1]);
1099
1100   savedPromise[0]->setValue(42);
1101   savedPromise[1]->setValue(43);
1102
1103   EXPECT_EQ(0, result[0]);
1104   EXPECT_EQ(0, result[1]);
1105
1106   manager.loopUntilNoReady();
1107   EXPECT_EQ(42, result[0]);
1108   EXPECT_EQ(43, result[1]);
1109 }
1110
1111 TEST(FiberManager, remoteHasTasks) {
1112   size_t counter = 0;
1113   FiberManager fm(folly::make_unique<SimpleLoopController>());
1114   std::thread remote([&]() { fm.addTaskRemote([&]() { ++counter; }); });
1115
1116   remote.join();
1117
1118   while (fm.hasTasks()) {
1119     fm.loopUntilNoReady();
1120   }
1121
1122   EXPECT_FALSE(fm.hasTasks());
1123   EXPECT_EQ(counter, 1);
1124 }
1125
1126 TEST(FiberManager, remoteHasReadyTasks) {
1127   int result = 0;
1128   folly::Optional<Promise<int>> savedPromise;
1129   FiberManager fm(folly::make_unique<SimpleLoopController>());
1130   std::thread remote([&]() {
1131     fm.addTaskRemote([&]() {
1132       result = await(
1133           [&](Promise<int> promise) { savedPromise = std::move(promise); });
1134       EXPECT_TRUE(fm.hasTasks());
1135     });
1136   });
1137
1138   remote.join();
1139   EXPECT_TRUE(fm.hasTasks());
1140
1141   fm.loopUntilNoReady();
1142   EXPECT_TRUE(fm.hasTasks());
1143
1144   std::thread remote2([&]() { savedPromise->setValue(47); });
1145   remote2.join();
1146   EXPECT_TRUE(fm.hasTasks());
1147
1148   fm.loopUntilNoReady();
1149   EXPECT_FALSE(fm.hasTasks());
1150
1151   EXPECT_EQ(result, 47);
1152 }
1153
1154 template <typename Data>
1155 void testFiberLocal() {
1156   FiberManager fm(
1157       LocalType<Data>(), folly::make_unique<SimpleLoopController>());
1158
1159   fm.addTask([]() {
1160     EXPECT_EQ(42, local<Data>().value);
1161
1162     local<Data>().value = 43;
1163
1164     addTask([]() {
1165       EXPECT_EQ(43, local<Data>().value);
1166
1167       local<Data>().value = 44;
1168
1169       addTask([]() { EXPECT_EQ(44, local<Data>().value); });
1170     });
1171   });
1172
1173   fm.addTask([&]() {
1174     EXPECT_EQ(42, local<Data>().value);
1175
1176     local<Data>().value = 43;
1177
1178     fm.addTaskRemote([]() { EXPECT_EQ(43, local<Data>().value); });
1179   });
1180
1181   fm.addTask([]() {
1182     EXPECT_EQ(42, local<Data>().value);
1183     local<Data>().value = 43;
1184
1185     auto task = []() {
1186       EXPECT_EQ(43, local<Data>().value);
1187       local<Data>().value = 44;
1188     };
1189     std::vector<std::function<void()>> tasks{task};
1190     collectAny(tasks.begin(), tasks.end());
1191
1192     EXPECT_EQ(43, local<Data>().value);
1193   });
1194
1195   fm.loopUntilNoReady();
1196   EXPECT_FALSE(fm.hasTasks());
1197 }
1198
1199 TEST(FiberManager, fiberLocal) {
1200   struct SimpleData {
1201     int value{42};
1202   };
1203
1204   testFiberLocal<SimpleData>();
1205 }
1206
1207 TEST(FiberManager, fiberLocalHeap) {
1208   struct LargeData {
1209     char _[1024 * 1024];
1210     int value{42};
1211   };
1212
1213   testFiberLocal<LargeData>();
1214 }
1215
1216 TEST(FiberManager, fiberLocalDestructor) {
1217   struct CrazyData {
1218     size_t data{42};
1219
1220     ~CrazyData() {
1221       if (data == 41) {
1222         addTask([]() {
1223           EXPECT_EQ(42, local<CrazyData>().data);
1224           // Make sure we don't have infinite loop
1225           local<CrazyData>().data = 0;
1226         });
1227       }
1228     }
1229   };
1230
1231   FiberManager fm(
1232       LocalType<CrazyData>(), folly::make_unique<SimpleLoopController>());
1233
1234   fm.addTask([]() { local<CrazyData>().data = 41; });
1235
1236   fm.loopUntilNoReady();
1237   EXPECT_FALSE(fm.hasTasks());
1238 }
1239
1240 TEST(FiberManager, yieldTest) {
1241   FiberManager manager(folly::make_unique<SimpleLoopController>());
1242   auto& loopController =
1243       dynamic_cast<SimpleLoopController&>(manager.loopController());
1244
1245   bool checkRan = false;
1246
1247   manager.addTask([&]() {
1248     manager.yield();
1249     checkRan = true;
1250   });
1251
1252   loopController.loop([&]() {
1253     if (checkRan) {
1254       loopController.stop();
1255     }
1256   });
1257
1258   EXPECT_TRUE(checkRan);
1259 }
1260
1261 TEST(FiberManager, RequestContext) {
1262   FiberManager fm(folly::make_unique<SimpleLoopController>());
1263
1264   bool checkRun1 = false;
1265   bool checkRun2 = false;
1266   bool checkRun3 = false;
1267   bool checkRun4 = false;
1268   folly::fibers::Baton baton1;
1269   folly::fibers::Baton baton2;
1270   folly::fibers::Baton baton3;
1271   folly::fibers::Baton baton4;
1272
1273   {
1274     folly::RequestContextScopeGuard rctx;
1275     auto rcontext1 = folly::RequestContext::get();
1276     fm.addTask([&, rcontext1]() {
1277       EXPECT_EQ(rcontext1, folly::RequestContext::get());
1278       baton1.wait(
1279           [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1280       EXPECT_EQ(rcontext1, folly::RequestContext::get());
1281       runInMainContext(
1282           [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1283       checkRun1 = true;
1284     });
1285   }
1286   {
1287     folly::RequestContextScopeGuard rctx;
1288     auto rcontext2 = folly::RequestContext::get();
1289     fm.addTaskRemote([&, rcontext2]() {
1290       EXPECT_EQ(rcontext2, folly::RequestContext::get());
1291       baton2.wait();
1292       EXPECT_EQ(rcontext2, folly::RequestContext::get());
1293       checkRun2 = true;
1294     });
1295   }
1296   {
1297     folly::RequestContextScopeGuard rctx;
1298     auto rcontext3 = folly::RequestContext::get();
1299     fm.addTaskFinally(
1300         [&, rcontext3]() {
1301           EXPECT_EQ(rcontext3, folly::RequestContext::get());
1302           baton3.wait();
1303           EXPECT_EQ(rcontext3, folly::RequestContext::get());
1304
1305           return folly::Unit();
1306         },
1307         [&, rcontext3](Try<folly::Unit>&& /* t */) {
1308           EXPECT_EQ(rcontext3, folly::RequestContext::get());
1309           checkRun3 = true;
1310         });
1311   }
1312   {
1313     folly::RequestContext::setContext(nullptr);
1314     fm.addTask([&]() {
1315       folly::RequestContextScopeGuard rctx;
1316       auto rcontext4 = folly::RequestContext::get();
1317       baton4.wait();
1318       EXPECT_EQ(rcontext4, folly::RequestContext::get());
1319       checkRun4 = true;
1320     });
1321   }
1322   {
1323     folly::RequestContextScopeGuard rctx;
1324     auto rcontext = folly::RequestContext::get();
1325
1326     fm.loopUntilNoReady();
1327     EXPECT_EQ(rcontext, folly::RequestContext::get());
1328
1329     baton1.post();
1330     EXPECT_EQ(rcontext, folly::RequestContext::get());
1331     fm.loopUntilNoReady();
1332     EXPECT_TRUE(checkRun1);
1333     EXPECT_EQ(rcontext, folly::RequestContext::get());
1334
1335     baton2.post();
1336     EXPECT_EQ(rcontext, folly::RequestContext::get());
1337     fm.loopUntilNoReady();
1338     EXPECT_TRUE(checkRun2);
1339     EXPECT_EQ(rcontext, folly::RequestContext::get());
1340
1341     baton3.post();
1342     EXPECT_EQ(rcontext, folly::RequestContext::get());
1343     fm.loopUntilNoReady();
1344     EXPECT_TRUE(checkRun3);
1345     EXPECT_EQ(rcontext, folly::RequestContext::get());
1346
1347     baton4.post();
1348     EXPECT_EQ(rcontext, folly::RequestContext::get());
1349     fm.loopUntilNoReady();
1350     EXPECT_TRUE(checkRun4);
1351     EXPECT_EQ(rcontext, folly::RequestContext::get());
1352   }
1353 }
1354
1355 TEST(FiberManager, resizePeriodically) {
1356   FiberManager::Options opts;
1357   opts.fibersPoolResizePeriodMs = 300;
1358   opts.maxFibersPoolSize = 5;
1359
1360   FiberManager manager(folly::make_unique<EventBaseLoopController>(), opts);
1361
1362   folly::EventBase evb;
1363   dynamic_cast<EventBaseLoopController&>(manager.loopController())
1364       .attachEventBase(evb);
1365
1366   std::vector<Baton> batons(10);
1367
1368   size_t tasksRun = 0;
1369   for (size_t i = 0; i < 30; ++i) {
1370     manager.addTask([i, &batons, &tasksRun]() {
1371       ++tasksRun;
1372       // Keep some fibers active indefinitely
1373       if (i < batons.size()) {
1374         batons[i].wait();
1375       }
1376     });
1377   }
1378
1379   EXPECT_EQ(0, tasksRun);
1380   EXPECT_EQ(30, manager.fibersAllocated());
1381   EXPECT_EQ(0, manager.fibersPoolSize());
1382
1383   evb.loopOnce();
1384   EXPECT_EQ(30, tasksRun);
1385   EXPECT_EQ(30, manager.fibersAllocated());
1386   // Can go over maxFibersPoolSize, 10 of 30 fibers still active
1387   EXPECT_EQ(20, manager.fibersPoolSize());
1388
1389   std::this_thread::sleep_for(std::chrono::milliseconds(400));
1390   evb.loopOnce(); // no fibers active in this period
1391   EXPECT_EQ(30, manager.fibersAllocated());
1392   EXPECT_EQ(20, manager.fibersPoolSize());
1393
1394   std::this_thread::sleep_for(std::chrono::milliseconds(400));
1395   evb.loopOnce(); // should shrink fibers pool to maxFibersPoolSize
1396   EXPECT_EQ(15, manager.fibersAllocated());
1397   EXPECT_EQ(5, manager.fibersPoolSize());
1398
1399   for (size_t i = 0; i < batons.size(); ++i) {
1400     batons[i].post();
1401   }
1402   evb.loopOnce();
1403   EXPECT_EQ(15, manager.fibersAllocated());
1404   EXPECT_EQ(15, manager.fibersPoolSize());
1405
1406   std::this_thread::sleep_for(std::chrono::milliseconds(400));
1407   evb.loopOnce(); // 10 fibers active in last period
1408   EXPECT_EQ(10, manager.fibersAllocated());
1409   EXPECT_EQ(10, manager.fibersPoolSize());
1410
1411   std::this_thread::sleep_for(std::chrono::milliseconds(400));
1412   evb.loopOnce();
1413   EXPECT_EQ(5, manager.fibersAllocated());
1414   EXPECT_EQ(5, manager.fibersPoolSize());
1415 }
1416
1417 TEST(FiberManager, batonWaitTimeoutHandler) {
1418   FiberManager manager(folly::make_unique<EventBaseLoopController>());
1419
1420   folly::EventBase evb;
1421   dynamic_cast<EventBaseLoopController&>(manager.loopController())
1422       .attachEventBase(evb);
1423
1424   size_t fibersRun = 0;
1425   Baton baton;
1426   Baton::TimeoutHandler timeoutHandler;
1427
1428   manager.addTask([&]() {
1429     baton.wait(timeoutHandler);
1430     ++fibersRun;
1431   });
1432   manager.loopUntilNoReady();
1433
1434   EXPECT_FALSE(baton.try_wait());
1435   EXPECT_EQ(0, fibersRun);
1436
1437   timeoutHandler.scheduleTimeout(std::chrono::milliseconds(250));
1438   std::this_thread::sleep_for(std::chrono::milliseconds(500));
1439
1440   EXPECT_FALSE(baton.try_wait());
1441   EXPECT_EQ(0, fibersRun);
1442
1443   evb.loopOnce();
1444   manager.loopUntilNoReady();
1445
1446   EXPECT_EQ(1, fibersRun);
1447 }
1448
1449 TEST(FiberManager, batonWaitTimeoutMany) {
1450   FiberManager manager(folly::make_unique<EventBaseLoopController>());
1451
1452   folly::EventBase evb;
1453   dynamic_cast<EventBaseLoopController&>(manager.loopController())
1454       .attachEventBase(evb);
1455
1456   constexpr size_t kNumTimeoutTasks = 10000;
1457   size_t tasksCount = kNumTimeoutTasks;
1458
1459   // We add many tasks to hit timeout queue deallocation logic.
1460   for (size_t i = 0; i < kNumTimeoutTasks; ++i) {
1461     manager.addTask([&]() {
1462       Baton baton;
1463       Baton::TimeoutHandler timeoutHandler;
1464
1465       folly::fibers::addTask([&] {
1466         timeoutHandler.scheduleTimeout(std::chrono::milliseconds(1000));
1467       });
1468
1469       baton.wait(timeoutHandler);
1470       if (--tasksCount == 0) {
1471         evb.terminateLoopSoon();
1472       }
1473     });
1474   }
1475
1476   evb.loopForever();
1477 }
1478
1479 TEST(FiberManager, remoteFutureTest) {
1480   FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
1481   auto& loopController =
1482       dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1483
1484   int testValue1 = 5;
1485   int testValue2 = 7;
1486   auto f1 = fiberManager.addTaskFuture([&]() { return testValue1; });
1487   auto f2 = fiberManager.addTaskRemoteFuture([&]() { return testValue2; });
1488   loopController.loop([&]() { loopController.stop(); });
1489   auto v1 = f1.get();
1490   auto v2 = f2.get();
1491
1492   EXPECT_EQ(v1, testValue1);
1493   EXPECT_EQ(v2, testValue2);
1494 }
1495
1496 // Test that a void function produes a Future<Unit>.
1497 TEST(FiberManager, remoteFutureVoidUnitTest) {
1498   FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
1499   auto& loopController =
1500       dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1501
1502   bool ranLocal = false;
1503   folly::Future<folly::Unit> futureLocal =
1504       fiberManager.addTaskFuture([&]() { ranLocal = true; });
1505
1506   bool ranRemote = false;
1507   folly::Future<folly::Unit> futureRemote =
1508       fiberManager.addTaskRemoteFuture([&]() { ranRemote = true; });
1509
1510   loopController.loop([&]() { loopController.stop(); });
1511
1512   futureLocal.wait();
1513   ASSERT_TRUE(ranLocal);
1514
1515   futureRemote.wait();
1516   ASSERT_TRUE(ranRemote);
1517 }
1518
1519 TEST(FiberManager, nestedFiberManagers) {
1520   folly::EventBase outerEvb;
1521   folly::EventBase innerEvb;
1522
1523   getFiberManager(outerEvb).addTask([&]() {
1524     EXPECT_EQ(
1525         &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1526
1527     runInMainContext([&]() {
1528       getFiberManager(innerEvb).addTask([&]() {
1529         EXPECT_EQ(
1530             &getFiberManager(innerEvb), FiberManager::getFiberManagerUnsafe());
1531
1532         innerEvb.terminateLoopSoon();
1533       });
1534
1535       innerEvb.loopForever();
1536     });
1537
1538     EXPECT_EQ(
1539         &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1540
1541     outerEvb.terminateLoopSoon();
1542   });
1543
1544   outerEvb.loopForever();
1545 }
1546
1547 TEST(FiberManager, semaphore) {
1548   constexpr size_t kTasks = 10;
1549   constexpr size_t kIterations = 10000;
1550   constexpr size_t kNumTokens = 10;
1551
1552   Semaphore sem(kNumTokens);
1553   int counterA = 0;
1554   int counterB = 0;
1555
1556   auto task = [&sem, kTasks, kIterations, kNumTokens](
1557       int& counter, folly::fibers::Baton& baton) {
1558     FiberManager manager(folly::make_unique<EventBaseLoopController>());
1559     folly::EventBase evb;
1560     dynamic_cast<EventBaseLoopController&>(manager.loopController())
1561         .attachEventBase(evb);
1562
1563     {
1564       std::shared_ptr<folly::EventBase> completionCounter(
1565           &evb, [](folly::EventBase* evb) { evb->terminateLoopSoon(); });
1566
1567       for (size_t i = 0; i < kTasks; ++i) {
1568         manager.addTask([&, completionCounter]() {
1569           for (size_t j = 0; j < kIterations; ++j) {
1570             sem.wait();
1571             ++counter;
1572             sem.signal();
1573             --counter;
1574
1575             EXPECT_LT(counter, kNumTokens);
1576             EXPECT_GE(counter, 0);
1577           }
1578         });
1579       }
1580
1581       baton.wait();
1582     }
1583     evb.loopForever();
1584   };
1585
1586   folly::fibers::Baton batonA;
1587   folly::fibers::Baton batonB;
1588   std::thread threadA([&] { task(counterA, batonA); });
1589   std::thread threadB([&] { task(counterB, batonB); });
1590
1591   batonA.post();
1592   batonB.post();
1593   threadA.join();
1594   threadB.join();
1595
1596   EXPECT_LT(counterA, kNumTokens);
1597   EXPECT_LT(counterB, kNumTokens);
1598   EXPECT_GE(counterA, 0);
1599   EXPECT_GE(counterB, 0);
1600 }
1601
1602 template <typename ExecutorT>
1603 void singleBatchDispatch(ExecutorT& executor, int batchSize, int index) {
1604   thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
1605       executor, [=](std::vector<int>&& batch) {
1606         EXPECT_EQ(batchSize, batch.size());
1607         std::vector<std::string> results;
1608         for (auto& it : batch) {
1609           results.push_back(folly::to<std::string>(it));
1610         }
1611         return results;
1612       });
1613
1614   auto indexCopy = index;
1615   auto result = batchDispatcher.add(std::move(indexCopy));
1616   EXPECT_EQ(folly::to<std::string>(index), result.get());
1617 }
1618
1619 TEST(FiberManager, batchDispatchTest) {
1620   folly::EventBase evb;
1621   auto& executor = getFiberManager(evb);
1622
1623   // Launch multiple fibers with a single id.
1624   executor.add([&]() {
1625     int batchSize = 10;
1626     for (int i = 0; i < batchSize; i++) {
1627       executor.add(
1628           [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
1629     }
1630   });
1631   evb.loop();
1632
1633   // Reuse the same BatchDispatcher to batch once again.
1634   executor.add([&]() {
1635     int batchSize = 10;
1636     for (int i = 0; i < batchSize; i++) {
1637       executor.add(
1638           [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
1639     }
1640   });
1641   evb.loop();
1642 }
1643
1644 template <typename ExecutorT>
1645 folly::Future<std::vector<std::string>> doubleBatchInnerDispatch(
1646     ExecutorT& executor,
1647     int totalNumberOfElements,
1648     std::vector<int> input) {
1649   thread_local BatchDispatcher<
1650       std::vector<int>,
1651       std::vector<std::string>,
1652       ExecutorT>
1653   batchDispatcher(executor, [=](std::vector<std::vector<int>>&& batch) {
1654     std::vector<std::vector<std::string>> results;
1655     int numberOfElements = 0;
1656     for (auto& unit : batch) {
1657       numberOfElements += unit.size();
1658       std::vector<std::string> result;
1659       for (auto& element : unit) {
1660         result.push_back(folly::to<std::string>(element));
1661       }
1662       results.push_back(std::move(result));
1663     }
1664     EXPECT_EQ(totalNumberOfElements, numberOfElements);
1665     return results;
1666   });
1667
1668   return batchDispatcher.add(std::move(input));
1669 }
1670
1671 /**
1672  * Batch values in groups of 5, and then call inner dispatch.
1673  */
1674 template <typename ExecutorT>
1675 void doubleBatchOuterDispatch(
1676     ExecutorT& executor,
1677     int totalNumberOfElements,
1678     int index) {
1679   thread_local BatchDispatcher<int, std::string, ExecutorT>
1680   batchDispatcher(executor, [=, &executor](std::vector<int>&& batch) {
1681     EXPECT_EQ(totalNumberOfElements, batch.size());
1682     std::vector<std::string> results;
1683     std::vector<folly::Future<std::vector<std::string>>>
1684         innerDispatchResultFutures;
1685
1686     std::vector<int> group;
1687     for (auto unit : batch) {
1688       group.push_back(unit);
1689       if (group.size() == 5) {
1690         auto localGroup = group;
1691         group.clear();
1692
1693         innerDispatchResultFutures.push_back(doubleBatchInnerDispatch(
1694             executor, totalNumberOfElements, localGroup));
1695       }
1696     }
1697
1698     folly::collectAll(
1699         innerDispatchResultFutures.begin(), innerDispatchResultFutures.end())
1700         .then([&](
1701             std::vector<Try<std::vector<std::string>>> innerDispatchResults) {
1702           for (auto& unit : innerDispatchResults) {
1703             for (auto& element : unit.value()) {
1704               results.push_back(element);
1705             }
1706           }
1707         })
1708         .get();
1709     return results;
1710   });
1711
1712   auto indexCopy = index;
1713   auto result = batchDispatcher.add(std::move(indexCopy));
1714   EXPECT_EQ(folly::to<std::string>(index), result.get());
1715 }
1716
1717 TEST(FiberManager, doubleBatchDispatchTest) {
1718   folly::EventBase evb;
1719   auto& executor = getFiberManager(evb);
1720
1721   // Launch multiple fibers with a single id.
1722   executor.add([&]() {
1723     int totalNumberOfElements = 20;
1724     for (int i = 0; i < totalNumberOfElements; i++) {
1725       executor.add([=, &executor]() {
1726         doubleBatchOuterDispatch(executor, totalNumberOfElements, i);
1727       });
1728     }
1729   });
1730   evb.loop();
1731 }
1732
1733 template <typename ExecutorT>
1734 void batchDispatchExceptionHandling(ExecutorT& executor, int i) {
1735   thread_local BatchDispatcher<int, int, ExecutorT> batchDispatcher(
1736       executor, [=, &executor](std::vector<int> &&) -> std::vector<int> {
1737         throw std::runtime_error("Surprise!!");
1738       });
1739
1740   EXPECT_THROW(batchDispatcher.add(i).get(), std::runtime_error);
1741 }
1742
1743 TEST(FiberManager, batchDispatchExceptionHandlingTest) {
1744   folly::EventBase evb;
1745   auto& executor = getFiberManager(evb);
1746
1747   // Launch multiple fibers with a single id.
1748   executor.add([&]() {
1749     int totalNumberOfElements = 5;
1750     for (int i = 0; i < totalNumberOfElements; i++) {
1751       executor.add(
1752           [=, &executor]() { batchDispatchExceptionHandling(executor, i); });
1753     }
1754   });
1755   evb.loop();
1756 }
1757
1758 namespace AtomicBatchDispatcherTesting {
1759
1760 using ValueT = size_t;
1761 using ResultT = std::string;
1762 using DispatchFunctionT =
1763     folly::Function<std::vector<ResultT>(std::vector<ValueT>&&)>;
1764
1765 #define ENABLE_TRACE_IN_TEST 0 // Set to 1 to debug issues in ABD tests
1766 #if ENABLE_TRACE_IN_TEST
1767 #define OUTPUT_TRACE std::cerr
1768 #else // ENABLE_TRACE_IN_TEST
1769 struct DevNullPiper {
1770   template <typename T>
1771   DevNullPiper& operator<<(const T&) {
1772     return *this;
1773   }
1774
1775   DevNullPiper& operator<<(std::ostream& (*)(std::ostream&)) {
1776     return *this;
1777   }
1778 } devNullPiper;
1779 #define OUTPUT_TRACE devNullPiper
1780 #endif // ENABLE_TRACE_IN_TEST
1781
1782 struct Job {
1783   AtomicBatchDispatcher<ValueT, ResultT>::Token token;
1784   ValueT input;
1785
1786   void preprocess(FiberManager& executor, bool die) {
1787     // Yield for a random duration [0, 10] ms to simulate I/O in preprocessing
1788     clock_t msecToDoIO = folly::Random::rand32() % 10;
1789     double start = (1000.0 * clock()) / CLOCKS_PER_SEC;
1790     double endAfter = start + msecToDoIO;
1791     while ((1000.0 * clock()) / CLOCKS_PER_SEC < endAfter) {
1792       executor.yield();
1793     }
1794     if (die) {
1795       throw std::logic_error("Simulating preprocessing failure");
1796     }
1797   }
1798
1799   Job(AtomicBatchDispatcher<ValueT, ResultT>::Token&& t, ValueT i)
1800       : token(std::move(t)), input(i) {}
1801
1802   Job(Job&&) = default;
1803   Job& operator=(Job&&) = default;
1804 };
1805
1806 ResultT processSingleInput(ValueT&& input) {
1807   return folly::to<ResultT>(std::move(input));
1808 }
1809
1810 std::vector<ResultT> userDispatchFunc(std::vector<ValueT>&& inputs) {
1811   size_t expectedCount = inputs.size();
1812   std::vector<ResultT> results;
1813   results.reserve(expectedCount);
1814   for (size_t i = 0; i < expectedCount; ++i) {
1815     results.emplace_back(processSingleInput(std::move(inputs[i])));
1816   }
1817   return results;
1818 }
1819
1820 void createJobs(
1821     AtomicBatchDispatcher<ValueT, ResultT>& atomicBatchDispatcher,
1822     std::vector<Job>& jobs,
1823     size_t count) {
1824   jobs.clear();
1825   for (size_t i = 0; i < count; ++i) {
1826     jobs.emplace_back(Job(atomicBatchDispatcher.getToken(), i));
1827   }
1828 }
1829
1830 enum class DispatchProblem {
1831   None,
1832   PreprocessThrows,
1833   DuplicateDispatch,
1834 };
1835
1836 void dispatchJobs(
1837     FiberManager& executor,
1838     std::vector<Job>& jobs,
1839     std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1840     DispatchProblem dispatchProblem = DispatchProblem::None,
1841     size_t problemIndex = size_t(-1)) {
1842   EXPECT_TRUE(
1843       dispatchProblem == DispatchProblem::None || problemIndex < jobs.size());
1844   results.clear();
1845   results.resize(jobs.size());
1846   for (size_t i = 0; i < jobs.size(); ++i) {
1847     executor.add(
1848         [i, &executor, &jobs, &results, dispatchProblem, problemIndex]() {
1849           try {
1850             Job job(std::move(jobs[i]));
1851
1852             if (dispatchProblem == DispatchProblem::PreprocessThrows) {
1853               if (i == problemIndex) {
1854                 EXPECT_THROW(job.preprocess(executor, true), std::logic_error);
1855                 return;
1856               }
1857             }
1858
1859             job.preprocess(executor, false);
1860             OUTPUT_TRACE << "Dispatching job #" << i << std::endl;
1861             results[i] = job.token.dispatch(job.input);
1862             OUTPUT_TRACE << "Result future filled for job #" << i << std::endl;
1863
1864             if (dispatchProblem == DispatchProblem::DuplicateDispatch) {
1865               if (i == problemIndex) {
1866                 EXPECT_THROW(job.token.dispatch(job.input), std::logic_error);
1867               }
1868             }
1869           } catch (...) {
1870             OUTPUT_TRACE << "Preprocessing failed for job #" << i << std::endl;
1871           }
1872         });
1873   }
1874 }
1875
1876 void validateResult(
1877     std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1878     size_t i) {
1879   try {
1880     OUTPUT_TRACE << "results[" << i << "].value() : " << results[i]->value()
1881                  << std::endl;
1882   } catch (std::exception& e) {
1883     OUTPUT_TRACE << "Exception : " << e.what() << std::endl;
1884     throw;
1885   }
1886 }
1887
1888 template <typename TException>
1889 void validateResults(
1890     std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1891     size_t expectedNumResults) {
1892   size_t numResultsFilled = 0;
1893   for (size_t i = 0; i < results.size(); ++i) {
1894     if (!results[i]) {
1895       continue;
1896     }
1897     ++numResultsFilled;
1898     EXPECT_THROW(validateResult(results, i), TException);
1899   }
1900   EXPECT_EQ(numResultsFilled, expectedNumResults);
1901 }
1902
1903 void validateResults(
1904     std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1905     size_t expectedNumResults) {
1906   size_t numResultsFilled = 0;
1907   for (size_t i = 0; i < results.size(); ++i) {
1908     if (!results[i]) {
1909       continue;
1910     }
1911     ++numResultsFilled;
1912     EXPECT_NO_THROW(validateResult(results, i));
1913     ValueT expectedInput = i;
1914     EXPECT_EQ(
1915         results[i]->value(), processSingleInput(std::move(expectedInput)));
1916   }
1917   EXPECT_EQ(numResultsFilled, expectedNumResults);
1918 }
1919
1920 } // AtomicBatchDispatcherTesting
1921
1922 #define SET_UP_TEST_FUNC                                        \
1923   using namespace AtomicBatchDispatcherTesting;                 \
1924   folly::EventBase evb;                                         \
1925   auto& executor = getFiberManager(evb);                        \
1926   const size_t COUNT = 11;                                      \
1927   std::vector<Job> jobs;                                        \
1928   jobs.reserve(COUNT);                                          \
1929   std::vector<folly::Optional<folly::Future<ResultT>>> results; \
1930   results.reserve(COUNT);                                       \
1931   DispatchFunctionT dispatchFunc
1932
1933 TEST(FiberManager, ABD_Test) {
1934   SET_UP_TEST_FUNC;
1935
1936   //
1937   // Testing AtomicBatchDispatcher with explicit call to commit()
1938   //
1939   dispatchFunc = userDispatchFunc;
1940   auto atomicBatchDispatcher =
1941       createAtomicBatchDispatcher(std::move(dispatchFunc));
1942   createJobs(atomicBatchDispatcher, jobs, COUNT);
1943   dispatchJobs(executor, jobs, results);
1944   atomicBatchDispatcher.commit();
1945   evb.loop();
1946   validateResults(results, COUNT);
1947 }
1948
1949 TEST(FiberManager, ABD_DispatcherDestroyedBeforeCallingCommit) {
1950   SET_UP_TEST_FUNC;
1951
1952   //
1953   // Testing AtomicBatchDispatcher destroyed before calling commit.
1954   // Handles error cases for:
1955   // - User might have forgotten to add the call to commit() in the code
1956   // - An unexpected exception got thrown in user code before commit() is called
1957   //
1958   try {
1959     dispatchFunc = userDispatchFunc;
1960     auto atomicBatchDispatcher =
1961         createAtomicBatchDispatcher(std::move(dispatchFunc));
1962     createJobs(atomicBatchDispatcher, jobs, COUNT);
1963     dispatchJobs(executor, jobs, results);
1964     throw std::runtime_error(
1965         "Unexpected exception in user code before commit called");
1966     atomicBatchDispatcher.commit();
1967   } catch (...) {
1968     /* User code handles the exception and does not exit process */
1969   }
1970   evb.loop();
1971   validateResults<std::logic_error>(results, COUNT);
1972 }
1973
1974 TEST(FiberManager, ABD_PreprocessingFailureTest) {
1975   SET_UP_TEST_FUNC;
1976
1977   //
1978   // Testing preprocessing failure on a job throws
1979   //
1980   dispatchFunc = userDispatchFunc;
1981   auto atomicBatchDispatcher =
1982       createAtomicBatchDispatcher(std::move(dispatchFunc));
1983   createJobs(atomicBatchDispatcher, jobs, COUNT);
1984   dispatchJobs(executor, jobs, results, DispatchProblem::PreprocessThrows, 8);
1985   atomicBatchDispatcher.commit();
1986   evb.loop();
1987   validateResults<std::logic_error>(results, COUNT - 1);
1988 }
1989
1990 TEST(FiberManager, ABD_MultipleDispatchOnSameTokenErrorTest) {
1991   SET_UP_TEST_FUNC;
1992
1993   //
1994   // Testing that calling dispatch more than once on the same token throws
1995   //
1996   dispatchFunc = userDispatchFunc;
1997   auto atomicBatchDispatcher =
1998       createAtomicBatchDispatcher(std::move(dispatchFunc));
1999   createJobs(atomicBatchDispatcher, jobs, COUNT);
2000   dispatchJobs(executor, jobs, results, DispatchProblem::DuplicateDispatch, 4);
2001   atomicBatchDispatcher.commit();
2002   evb.loop();
2003 }
2004
2005 TEST(FiberManager, ABD_GetTokenCalledAfterCommitTest) {
2006   SET_UP_TEST_FUNC;
2007
2008   //
2009   // Testing that exception set on attempt to call getToken after commit called
2010   //
2011   dispatchFunc = userDispatchFunc;
2012   auto atomicBatchDispatcher =
2013       createAtomicBatchDispatcher(std::move(dispatchFunc));
2014   createJobs(atomicBatchDispatcher, jobs, COUNT);
2015   atomicBatchDispatcher.commit();
2016   EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
2017   dispatchJobs(executor, jobs, results);
2018   EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
2019   evb.loop();
2020   validateResults(results, COUNT);
2021   EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
2022 }
2023
2024 TEST(FiberManager, ABD_UserProvidedBatchDispatchThrowsTest) {
2025   SET_UP_TEST_FUNC;
2026
2027   //
2028   // Testing that exception is set if user provided batch dispatch throws
2029   //
2030   dispatchFunc = [](std::vector<ValueT>&& inputs) -> std::vector<ResultT> {
2031     (void)userDispatchFunc(std::move(inputs));
2032     throw std::runtime_error("Unexpected exception in user dispatch function");
2033   };
2034   auto atomicBatchDispatcher =
2035       createAtomicBatchDispatcher(std::move(dispatchFunc));
2036   createJobs(atomicBatchDispatcher, jobs, COUNT);
2037   dispatchJobs(executor, jobs, results);
2038   atomicBatchDispatcher.commit();
2039   evb.loop();
2040   validateResults<std::runtime_error>(results, COUNT);
2041 }
2042
2043 TEST(FiberManager, VirtualEventBase) {
2044   folly::ScopedEventBaseThread thread;
2045
2046   auto evb1 =
2047       folly::make_unique<folly::VirtualEventBase>(*thread.getEventBase());
2048   auto evb2 =
2049       folly::make_unique<folly::VirtualEventBase>(*thread.getEventBase());
2050
2051   bool done1{false};
2052   bool done2{false};
2053
2054   getFiberManager(*evb1).addTaskRemote([&] {
2055     Baton baton;
2056     baton.timed_wait(std::chrono::milliseconds{100});
2057
2058     done1 = true;
2059   });
2060
2061   getFiberManager(*evb2).addTaskRemote([&] {
2062     Baton baton;
2063     baton.timed_wait(std::chrono::milliseconds{200});
2064
2065     done2 = true;
2066   });
2067
2068   evb1.reset();
2069   EXPECT_TRUE(done1);
2070
2071   evb2.reset();
2072   EXPECT_TRUE(done2);
2073 }
2074
2075 /**
2076  * Test that we can properly track fiber stack usage.
2077  *
2078  * This functionality can only be enabled when ASAN is disabled, so avoid
2079  * running this test with ASAN.
2080  */
2081 #ifndef FOLLY_SANITIZE_ADDRESS
2082 TEST(FiberManager, recordStack) {
2083   std::thread([] {
2084     folly::fibers::FiberManager::Options opts;
2085     opts.recordStackEvery = 1;
2086
2087     FiberManager fm(folly::make_unique<SimpleLoopController>(), opts);
2088     auto& loopController =
2089         dynamic_cast<SimpleLoopController&>(fm.loopController());
2090
2091     static constexpr size_t n = 1000;
2092     int s = 0;
2093     fm.addTask([&]() {
2094       int b[n] = {0};
2095       for (size_t i = 0; i < n; ++i) {
2096         b[i] = i;
2097       }
2098       for (size_t i = 0; i + 1 < n; ++i) {
2099         s += b[i] * b[i + 1];
2100       }
2101     });
2102
2103     (void)s;
2104
2105     loopController.loop([&]() { loopController.stop(); });
2106
2107     // Check that we properly accounted fiber stack usage.
2108     EXPECT_LT(n * sizeof(int), fm.stackHighWatermark());
2109   }).join();
2110 }
2111 #endif