2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <gtest/gtest.h>
22 #include <folly/Benchmark.h>
23 #include <folly/Memory.h>
24 #include <folly/futures/Future.h>
26 #include <folly/fibers/AddTasks.h>
27 #include <folly/fibers/EventBaseLoopController.h>
28 #include <folly/fibers/FiberManager.h>
29 #include <folly/fibers/FiberManagerMap.h>
30 #include <folly/fibers/GenericBaton.h>
31 #include <folly/fibers/SimpleLoopController.h>
32 #include <folly/fibers/WhenN.h>
34 using namespace folly::fibers;
38 TEST(FiberManager, batonTimedWaitTimeout) {
39 bool taskAdded = false;
40 size_t iterations = 0;
42 FiberManager manager(folly::make_unique<SimpleLoopController>());
43 auto& loopController =
44 dynamic_cast<SimpleLoopController&>(manager.loopController());
46 auto loopFunc = [&]() {
48 manager.addTask([&]() {
51 auto res = baton.timed_wait(std::chrono::milliseconds(230));
54 EXPECT_EQ(5, iterations);
56 loopController.stop();
58 manager.addTask([&]() {
61 auto res = baton.timed_wait(std::chrono::milliseconds(130));
64 EXPECT_EQ(3, iterations);
66 loopController.stop();
70 std::this_thread::sleep_for(std::chrono::milliseconds(50));
75 loopController.loop(std::move(loopFunc));
78 TEST(FiberManager, batonTimedWaitPost) {
79 bool taskAdded = false;
80 size_t iterations = 0;
83 FiberManager manager(folly::make_unique<SimpleLoopController>());
84 auto& loopController =
85 dynamic_cast<SimpleLoopController&>(manager.loopController());
87 auto loopFunc = [&]() {
89 manager.addTask([&]() {
93 auto res = baton.timed_wait(std::chrono::milliseconds(130));
96 EXPECT_EQ(2, iterations);
98 loopController.stop();
102 std::this_thread::sleep_for(std::chrono::milliseconds(50));
104 if (iterations == 2) {
110 loopController.loop(std::move(loopFunc));
113 TEST(FiberManager, batonTimedWaitTimeoutEvb) {
114 size_t tasksComplete = 0;
116 folly::EventBase evb;
118 FiberManager manager(folly::make_unique<EventBaseLoopController>());
119 dynamic_cast<EventBaseLoopController&>(manager.loopController())
120 .attachEventBase(evb);
122 auto task = [&](size_t timeout_ms) {
125 auto start = EventBaseLoopController::Clock::now();
126 auto res = baton.timed_wait(std::chrono::milliseconds(timeout_ms));
127 auto finish = EventBaseLoopController::Clock::now();
132 std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
134 EXPECT_GT(duration_ms.count(), timeout_ms - 50);
135 EXPECT_LT(duration_ms.count(), timeout_ms + 50);
137 if (++tasksComplete == 2) {
138 evb.terminateLoopSoon();
142 evb.runInEventBaseThread([&]() {
143 manager.addTask([&]() { task(500); });
144 manager.addTask([&]() { task(250); });
149 EXPECT_EQ(2, tasksComplete);
152 TEST(FiberManager, batonTimedWaitPostEvb) {
153 size_t tasksComplete = 0;
155 folly::EventBase evb;
157 FiberManager manager(folly::make_unique<EventBaseLoopController>());
158 dynamic_cast<EventBaseLoopController&>(manager.loopController())
159 .attachEventBase(evb);
161 evb.runInEventBaseThread([&]() {
162 manager.addTask([&]() {
165 evb.tryRunAfterDelay([&]() { baton.post(); }, 100);
167 auto start = EventBaseLoopController::Clock::now();
168 auto res = baton.timed_wait(std::chrono::milliseconds(130));
169 auto finish = EventBaseLoopController::Clock::now();
174 std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
176 EXPECT_TRUE(duration_ms.count() > 95 && duration_ms.count() < 110);
178 if (++tasksComplete == 1) {
179 evb.terminateLoopSoon();
186 EXPECT_EQ(1, tasksComplete);
189 TEST(FiberManager, batonTryWait) {
190 FiberManager manager(folly::make_unique<SimpleLoopController>());
192 // Check if try_wait and post work as expected
195 manager.addTask([&]() {
196 while (!b.try_wait()) {
199 auto thr = std::thread([&]() {
200 std::this_thread::sleep_for(std::chrono::milliseconds(300));
204 manager.loopUntilNoReady();
209 // Check try_wait without post
210 manager.addTask([&]() {
212 while (cnt && !c.try_wait()) {
215 EXPECT_TRUE(!c.try_wait()); // must still hold
219 manager.loopUntilNoReady();
222 TEST(FiberManager, genericBatonFiberWait) {
223 FiberManager manager(folly::make_unique<SimpleLoopController>());
226 bool fiberRunning = false;
228 manager.addTask([&]() {
229 EXPECT_EQ(manager.hasActiveFiber(), true);
232 fiberRunning = false;
235 EXPECT_FALSE(fiberRunning);
236 manager.loopUntilNoReady();
237 EXPECT_TRUE(fiberRunning); // ensure fiber still active
239 auto thr = std::thread([&]() {
240 std::this_thread::sleep_for(std::chrono::milliseconds(300));
244 while (fiberRunning) {
245 manager.loopUntilNoReady();
251 TEST(FiberManager, genericBatonThreadWait) {
252 FiberManager manager(folly::make_unique<SimpleLoopController>());
254 std::atomic<bool> threadWaiting(false);
256 auto thr = std::thread([&]() {
257 threadWaiting = true;
259 threadWaiting = false;
262 while (!threadWaiting) {
264 std::this_thread::sleep_for(std::chrono::milliseconds(300));
266 manager.addTask([&]() {
267 EXPECT_EQ(manager.hasActiveFiber(), true);
268 EXPECT_TRUE(threadWaiting);
270 while (threadWaiting) {
274 manager.loopUntilNoReady();
278 TEST(FiberManager, addTasksNoncopyable) {
279 std::vector<Promise<int>> pendingFibers;
280 bool taskAdded = false;
282 FiberManager manager(folly::make_unique<SimpleLoopController>());
283 auto& loopController =
284 dynamic_cast<SimpleLoopController&>(manager.loopController());
286 auto loopFunc = [&]() {
288 manager.addTask([&]() {
289 std::vector<std::function<std::unique_ptr<int>()>> funcs;
290 for (size_t i = 0; i < 3; ++i) {
291 funcs.push_back([i, &pendingFibers]() {
292 await([&pendingFibers](Promise<int> promise) {
293 pendingFibers.push_back(std::move(promise));
295 return folly::make_unique<int>(i * 2 + 1);
299 auto iter = addTasks(funcs.begin(), funcs.end());
302 while (iter.hasNext()) {
303 auto result = iter.awaitNext();
304 EXPECT_EQ(2 * iter.getTaskID() + 1, *result);
305 EXPECT_GE(2 - n, pendingFibers.size());
311 } else if (pendingFibers.size()) {
312 pendingFibers.back().setValue(0);
313 pendingFibers.pop_back();
315 loopController.stop();
319 loopController.loop(std::move(loopFunc));
322 TEST(FiberManager, awaitThrow) {
323 folly::EventBase evb;
324 struct ExpectedException {};
328 await([](Promise<int> p) {
330 throw ExpectedException();
336 await([&](Promise<int> p) {
337 evb.runInEventBaseThread([p = std::move(p)]() mutable {
340 throw ExpectedException();
347 TEST(FiberManager, addTasksThrow) {
348 std::vector<Promise<int>> pendingFibers;
349 bool taskAdded = false;
351 FiberManager manager(folly::make_unique<SimpleLoopController>());
352 auto& loopController =
353 dynamic_cast<SimpleLoopController&>(manager.loopController());
355 auto loopFunc = [&]() {
357 manager.addTask([&]() {
358 std::vector<std::function<int()>> funcs;
359 for (size_t i = 0; i < 3; ++i) {
360 funcs.push_back([i, &pendingFibers]() {
361 await([&pendingFibers](Promise<int> promise) {
362 pendingFibers.push_back(std::move(promise));
365 throw std::runtime_error("Runtime");
371 auto iter = addTasks(funcs.begin(), funcs.end());
374 while (iter.hasNext()) {
376 int result = iter.awaitNext();
377 EXPECT_EQ(1, iter.getTaskID() % 2);
378 EXPECT_EQ(2 * iter.getTaskID() + 1, result);
380 EXPECT_EQ(0, iter.getTaskID() % 2);
382 EXPECT_GE(2 - n, pendingFibers.size());
388 } else if (pendingFibers.size()) {
389 pendingFibers.back().setValue(0);
390 pendingFibers.pop_back();
392 loopController.stop();
396 loopController.loop(std::move(loopFunc));
399 TEST(FiberManager, addTasksVoid) {
400 std::vector<Promise<int>> pendingFibers;
401 bool taskAdded = false;
403 FiberManager manager(folly::make_unique<SimpleLoopController>());
404 auto& loopController =
405 dynamic_cast<SimpleLoopController&>(manager.loopController());
407 auto loopFunc = [&]() {
409 manager.addTask([&]() {
410 std::vector<std::function<void()>> funcs;
411 for (size_t i = 0; i < 3; ++i) {
412 funcs.push_back([i, &pendingFibers]() {
413 await([&pendingFibers](Promise<int> promise) {
414 pendingFibers.push_back(std::move(promise));
419 auto iter = addTasks(funcs.begin(), funcs.end());
422 while (iter.hasNext()) {
424 EXPECT_GE(2 - n, pendingFibers.size());
430 } else if (pendingFibers.size()) {
431 pendingFibers.back().setValue(0);
432 pendingFibers.pop_back();
434 loopController.stop();
438 loopController.loop(std::move(loopFunc));
441 TEST(FiberManager, addTasksVoidThrow) {
442 std::vector<Promise<int>> pendingFibers;
443 bool taskAdded = false;
445 FiberManager manager(folly::make_unique<SimpleLoopController>());
446 auto& loopController =
447 dynamic_cast<SimpleLoopController&>(manager.loopController());
449 auto loopFunc = [&]() {
451 manager.addTask([&]() {
452 std::vector<std::function<void()>> funcs;
453 for (size_t i = 0; i < 3; ++i) {
454 funcs.push_back([i, &pendingFibers]() {
455 await([&pendingFibers](Promise<int> promise) {
456 pendingFibers.push_back(std::move(promise));
459 throw std::runtime_error("");
464 auto iter = addTasks(funcs.begin(), funcs.end());
467 while (iter.hasNext()) {
470 EXPECT_EQ(1, iter.getTaskID() % 2);
472 EXPECT_EQ(0, iter.getTaskID() % 2);
474 EXPECT_GE(2 - n, pendingFibers.size());
480 } else if (pendingFibers.size()) {
481 pendingFibers.back().setValue(0);
482 pendingFibers.pop_back();
484 loopController.stop();
488 loopController.loop(std::move(loopFunc));
491 TEST(FiberManager, addTasksReserve) {
492 std::vector<Promise<int>> pendingFibers;
493 bool taskAdded = false;
495 FiberManager manager(folly::make_unique<SimpleLoopController>());
496 auto& loopController =
497 dynamic_cast<SimpleLoopController&>(manager.loopController());
499 auto loopFunc = [&]() {
501 manager.addTask([&]() {
502 std::vector<std::function<void()>> funcs;
503 for (size_t i = 0; i < 3; ++i) {
504 funcs.push_back([&pendingFibers]() {
505 await([&pendingFibers](Promise<int> promise) {
506 pendingFibers.push_back(std::move(promise));
511 auto iter = addTasks(funcs.begin(), funcs.end());
514 EXPECT_TRUE(iter.hasCompleted());
515 EXPECT_TRUE(iter.hasPending());
516 EXPECT_TRUE(iter.hasNext());
519 EXPECT_TRUE(iter.hasCompleted());
520 EXPECT_TRUE(iter.hasPending());
521 EXPECT_TRUE(iter.hasNext());
524 EXPECT_FALSE(iter.hasCompleted());
525 EXPECT_TRUE(iter.hasPending());
526 EXPECT_TRUE(iter.hasNext());
529 EXPECT_FALSE(iter.hasCompleted());
530 EXPECT_FALSE(iter.hasPending());
531 EXPECT_FALSE(iter.hasNext());
534 } else if (pendingFibers.size()) {
535 pendingFibers.back().setValue(0);
536 pendingFibers.pop_back();
538 loopController.stop();
542 loopController.loop(std::move(loopFunc));
545 TEST(FiberManager, addTaskDynamic) {
546 folly::EventBase evb;
550 auto makeTask = [&](size_t taskId) {
551 return [&, taskId]() -> size_t {
552 batons[taskId].wait();
558 .addTaskFuture([&]() {
559 TaskIterator<size_t> iterator;
561 iterator.addTask(makeTask(0));
562 iterator.addTask(makeTask(1));
566 EXPECT_EQ(1, iterator.awaitNext());
568 iterator.addTask(makeTask(2));
572 EXPECT_EQ(2, iterator.awaitNext());
576 EXPECT_EQ(0, iterator.awaitNext());
581 TEST(FiberManager, forEach) {
582 std::vector<Promise<int>> pendingFibers;
583 bool taskAdded = false;
585 FiberManager manager(folly::make_unique<SimpleLoopController>());
586 auto& loopController =
587 dynamic_cast<SimpleLoopController&>(manager.loopController());
589 auto loopFunc = [&]() {
591 manager.addTask([&]() {
592 std::vector<std::function<int()>> funcs;
593 for (size_t i = 0; i < 3; ++i) {
594 funcs.push_back([i, &pendingFibers]() {
595 await([&pendingFibers](Promise<int> promise) {
596 pendingFibers.push_back(std::move(promise));
602 std::vector<std::pair<size_t, int>> results;
603 forEach(funcs.begin(), funcs.end(), [&results](size_t id, int result) {
604 results.emplace_back(id, result);
606 EXPECT_EQ(3, results.size());
607 EXPECT_TRUE(pendingFibers.empty());
608 for (size_t i = 0; i < 3; ++i) {
609 EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
613 } else if (pendingFibers.size()) {
614 pendingFibers.back().setValue(0);
615 pendingFibers.pop_back();
617 loopController.stop();
621 loopController.loop(std::move(loopFunc));
624 TEST(FiberManager, collectN) {
625 std::vector<Promise<int>> pendingFibers;
626 bool taskAdded = false;
628 FiberManager manager(folly::make_unique<SimpleLoopController>());
629 auto& loopController =
630 dynamic_cast<SimpleLoopController&>(manager.loopController());
632 auto loopFunc = [&]() {
634 manager.addTask([&]() {
635 std::vector<std::function<int()>> funcs;
636 for (size_t i = 0; i < 3; ++i) {
637 funcs.push_back([i, &pendingFibers]() {
638 await([&pendingFibers](Promise<int> promise) {
639 pendingFibers.push_back(std::move(promise));
645 auto results = collectN(funcs.begin(), funcs.end(), 2);
646 EXPECT_EQ(2, results.size());
647 EXPECT_EQ(1, pendingFibers.size());
648 for (size_t i = 0; i < 2; ++i) {
649 EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
653 } else if (pendingFibers.size()) {
654 pendingFibers.back().setValue(0);
655 pendingFibers.pop_back();
657 loopController.stop();
661 loopController.loop(std::move(loopFunc));
664 TEST(FiberManager, collectNThrow) {
665 std::vector<Promise<int>> pendingFibers;
666 bool taskAdded = false;
668 FiberManager manager(folly::make_unique<SimpleLoopController>());
669 auto& loopController =
670 dynamic_cast<SimpleLoopController&>(manager.loopController());
672 auto loopFunc = [&]() {
674 manager.addTask([&]() {
675 std::vector<std::function<int()>> funcs;
676 for (size_t i = 0; i < 3; ++i) {
677 funcs.push_back([i, &pendingFibers]() {
678 await([&pendingFibers](Promise<int> promise) {
679 pendingFibers.push_back(std::move(promise));
681 throw std::runtime_error("Runtime");
687 collectN(funcs.begin(), funcs.end(), 2);
689 EXPECT_EQ(1, pendingFibers.size());
693 } else if (pendingFibers.size()) {
694 pendingFibers.back().setValue(0);
695 pendingFibers.pop_back();
697 loopController.stop();
701 loopController.loop(std::move(loopFunc));
704 TEST(FiberManager, collectNVoid) {
705 std::vector<Promise<int>> pendingFibers;
706 bool taskAdded = false;
708 FiberManager manager(folly::make_unique<SimpleLoopController>());
709 auto& loopController =
710 dynamic_cast<SimpleLoopController&>(manager.loopController());
712 auto loopFunc = [&]() {
714 manager.addTask([&]() {
715 std::vector<std::function<void()>> funcs;
716 for (size_t i = 0; i < 3; ++i) {
717 funcs.push_back([i, &pendingFibers]() {
718 await([&pendingFibers](Promise<int> promise) {
719 pendingFibers.push_back(std::move(promise));
724 auto results = collectN(funcs.begin(), funcs.end(), 2);
725 EXPECT_EQ(2, results.size());
726 EXPECT_EQ(1, pendingFibers.size());
729 } else if (pendingFibers.size()) {
730 pendingFibers.back().setValue(0);
731 pendingFibers.pop_back();
733 loopController.stop();
737 loopController.loop(std::move(loopFunc));
740 TEST(FiberManager, collectNVoidThrow) {
741 std::vector<Promise<int>> pendingFibers;
742 bool taskAdded = false;
744 FiberManager manager(folly::make_unique<SimpleLoopController>());
745 auto& loopController =
746 dynamic_cast<SimpleLoopController&>(manager.loopController());
748 auto loopFunc = [&]() {
750 manager.addTask([&]() {
751 std::vector<std::function<void()>> funcs;
752 for (size_t i = 0; i < 3; ++i) {
753 funcs.push_back([i, &pendingFibers]() {
754 await([&pendingFibers](Promise<int> promise) {
755 pendingFibers.push_back(std::move(promise));
757 throw std::runtime_error("Runtime");
762 collectN(funcs.begin(), funcs.end(), 2);
764 EXPECT_EQ(1, pendingFibers.size());
768 } else if (pendingFibers.size()) {
769 pendingFibers.back().setValue(0);
770 pendingFibers.pop_back();
772 loopController.stop();
776 loopController.loop(std::move(loopFunc));
779 TEST(FiberManager, collectAll) {
780 std::vector<Promise<int>> pendingFibers;
781 bool taskAdded = false;
783 FiberManager manager(folly::make_unique<SimpleLoopController>());
784 auto& loopController =
785 dynamic_cast<SimpleLoopController&>(manager.loopController());
787 auto loopFunc = [&]() {
789 manager.addTask([&]() {
790 std::vector<std::function<int()>> funcs;
791 for (size_t i = 0; i < 3; ++i) {
792 funcs.push_back([i, &pendingFibers]() {
793 await([&pendingFibers](Promise<int> promise) {
794 pendingFibers.push_back(std::move(promise));
800 auto results = collectAll(funcs.begin(), funcs.end());
801 EXPECT_TRUE(pendingFibers.empty());
802 for (size_t i = 0; i < 3; ++i) {
803 EXPECT_EQ(i * 2 + 1, results[i]);
807 } else if (pendingFibers.size()) {
808 pendingFibers.back().setValue(0);
809 pendingFibers.pop_back();
811 loopController.stop();
815 loopController.loop(std::move(loopFunc));
818 TEST(FiberManager, collectAllVoid) {
819 std::vector<Promise<int>> pendingFibers;
820 bool taskAdded = false;
822 FiberManager manager(folly::make_unique<SimpleLoopController>());
823 auto& loopController =
824 dynamic_cast<SimpleLoopController&>(manager.loopController());
826 auto loopFunc = [&]() {
828 manager.addTask([&]() {
829 std::vector<std::function<void()>> funcs;
830 for (size_t i = 0; i < 3; ++i) {
831 funcs.push_back([i, &pendingFibers]() {
832 await([&pendingFibers](Promise<int> promise) {
833 pendingFibers.push_back(std::move(promise));
838 collectAll(funcs.begin(), funcs.end());
839 EXPECT_TRUE(pendingFibers.empty());
842 } else if (pendingFibers.size()) {
843 pendingFibers.back().setValue(0);
844 pendingFibers.pop_back();
846 loopController.stop();
850 loopController.loop(std::move(loopFunc));
853 TEST(FiberManager, collectAny) {
854 std::vector<Promise<int>> pendingFibers;
855 bool taskAdded = false;
857 FiberManager manager(folly::make_unique<SimpleLoopController>());
858 auto& loopController =
859 dynamic_cast<SimpleLoopController&>(manager.loopController());
861 auto loopFunc = [&]() {
863 manager.addTask([&]() {
864 std::vector<std::function<int()>> funcs;
865 for (size_t i = 0; i < 3; ++i) {
866 funcs.push_back([i, &pendingFibers]() {
867 await([&pendingFibers](Promise<int> promise) {
868 pendingFibers.push_back(std::move(promise));
871 throw std::runtime_error("This exception will be ignored");
877 auto result = collectAny(funcs.begin(), funcs.end());
878 EXPECT_EQ(2, pendingFibers.size());
879 EXPECT_EQ(2, result.first);
880 EXPECT_EQ(2 * 2 + 1, result.second);
883 } else if (pendingFibers.size()) {
884 pendingFibers.back().setValue(0);
885 pendingFibers.pop_back();
887 loopController.stop();
891 loopController.loop(std::move(loopFunc));
895 /* Checks that this function was run from a main context,
896 by comparing an address on a stack to a known main stack address
897 and a known related fiber stack address. The assumption
898 is that fiber stack and main stack will be far enough apart,
899 while any two values on the same stack will be close. */
900 void expectMainContext(bool& ran, int* mainLocation, int* fiberLocation) {
902 /* 2 pages is a good guess */
903 constexpr ssize_t DISTANCE = 0x2000 / sizeof(int);
905 EXPECT_TRUE(std::abs(&here - fiberLocation) > DISTANCE);
908 EXPECT_TRUE(std::abs(&here - mainLocation) < DISTANCE);
916 TEST(FiberManager, runInMainContext) {
917 FiberManager manager(folly::make_unique<SimpleLoopController>());
918 auto& loopController =
919 dynamic_cast<SimpleLoopController&>(manager.loopController());
921 bool checkRan = false;
924 manager.runInMainContext(
925 [&]() { expectMainContext(checkRan, &mainLocation, nullptr); });
926 EXPECT_TRUE(checkRan);
930 manager.addTask([&]() {
932 explicit A(int value_) : value(value_) {}
933 A(const A&) = delete;
939 auto ret = runInMainContext([&]() {
940 expectMainContext(checkRan, &mainLocation, &stackLocation);
943 EXPECT_TRUE(checkRan);
944 EXPECT_EQ(42, ret.value);
947 loopController.loop([&]() { loopController.stop(); });
949 EXPECT_TRUE(checkRan);
952 TEST(FiberManager, addTaskFinally) {
953 FiberManager manager(folly::make_unique<SimpleLoopController>());
954 auto& loopController =
955 dynamic_cast<SimpleLoopController&>(manager.loopController());
957 bool checkRan = false;
961 manager.addTaskFinally(
962 [&]() { return 1234; },
963 [&](Try<int>&& result) {
964 EXPECT_EQ(result.value(), 1234);
966 expectMainContext(checkRan, &mainLocation, nullptr);
969 EXPECT_FALSE(checkRan);
971 loopController.loop([&]() { loopController.stop(); });
973 EXPECT_TRUE(checkRan);
976 TEST(FiberManager, fibersPoolWithinLimit) {
977 FiberManager::Options opts;
978 opts.maxFibersPoolSize = 5;
980 FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
981 auto& loopController =
982 dynamic_cast<SimpleLoopController&>(manager.loopController());
984 size_t fibersRun = 0;
986 for (size_t i = 0; i < 5; ++i) {
987 manager.addTask([&]() { ++fibersRun; });
989 loopController.loop([&]() { loopController.stop(); });
991 EXPECT_EQ(5, fibersRun);
992 EXPECT_EQ(5, manager.fibersAllocated());
993 EXPECT_EQ(5, manager.fibersPoolSize());
995 for (size_t i = 0; i < 5; ++i) {
996 manager.addTask([&]() { ++fibersRun; });
998 loopController.loop([&]() { loopController.stop(); });
1000 EXPECT_EQ(10, fibersRun);
1001 EXPECT_EQ(5, manager.fibersAllocated());
1002 EXPECT_EQ(5, manager.fibersPoolSize());
1005 TEST(FiberManager, fibersPoolOverLimit) {
1006 FiberManager::Options opts;
1007 opts.maxFibersPoolSize = 5;
1009 FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
1010 auto& loopController =
1011 dynamic_cast<SimpleLoopController&>(manager.loopController());
1013 size_t fibersRun = 0;
1015 for (size_t i = 0; i < 10; ++i) {
1016 manager.addTask([&]() { ++fibersRun; });
1019 EXPECT_EQ(0, fibersRun);
1020 EXPECT_EQ(10, manager.fibersAllocated());
1021 EXPECT_EQ(0, manager.fibersPoolSize());
1023 loopController.loop([&]() { loopController.stop(); });
1025 EXPECT_EQ(10, fibersRun);
1026 EXPECT_EQ(5, manager.fibersAllocated());
1027 EXPECT_EQ(5, manager.fibersPoolSize());
1030 TEST(FiberManager, remoteFiberBasic) {
1031 FiberManager manager(folly::make_unique<SimpleLoopController>());
1032 auto& loopController =
1033 dynamic_cast<SimpleLoopController&>(manager.loopController());
1036 result[0] = result[1] = 0;
1037 folly::Optional<Promise<int>> savedPromise[2];
1038 manager.addTask([&]() {
1040 [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1042 manager.addTask([&]() {
1044 [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1047 manager.loopUntilNoReady();
1049 EXPECT_TRUE(savedPromise[0].hasValue());
1050 EXPECT_TRUE(savedPromise[1].hasValue());
1051 EXPECT_EQ(0, result[0]);
1052 EXPECT_EQ(0, result[1]);
1054 std::thread remoteThread0{[&]() { savedPromise[0]->setValue(42); }};
1055 std::thread remoteThread1{[&]() { savedPromise[1]->setValue(43); }};
1056 remoteThread0.join();
1057 remoteThread1.join();
1058 EXPECT_EQ(0, result[0]);
1059 EXPECT_EQ(0, result[1]);
1060 /* Should only have scheduled once */
1061 EXPECT_EQ(1, loopController.remoteScheduleCalled());
1063 manager.loopUntilNoReady();
1064 EXPECT_EQ(42, result[0]);
1065 EXPECT_EQ(43, result[1]);
1068 TEST(FiberManager, addTaskRemoteBasic) {
1069 FiberManager manager(folly::make_unique<SimpleLoopController>());
1072 result[0] = result[1] = 0;
1073 folly::Optional<Promise<int>> savedPromise[2];
1075 std::thread remoteThread0{[&]() {
1076 manager.addTaskRemote([&]() {
1078 [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1081 std::thread remoteThread1{[&]() {
1082 manager.addTaskRemote([&]() {
1084 [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1087 remoteThread0.join();
1088 remoteThread1.join();
1090 manager.loopUntilNoReady();
1092 EXPECT_TRUE(savedPromise[0].hasValue());
1093 EXPECT_TRUE(savedPromise[1].hasValue());
1094 EXPECT_EQ(0, result[0]);
1095 EXPECT_EQ(0, result[1]);
1097 savedPromise[0]->setValue(42);
1098 savedPromise[1]->setValue(43);
1100 EXPECT_EQ(0, result[0]);
1101 EXPECT_EQ(0, result[1]);
1103 manager.loopUntilNoReady();
1104 EXPECT_EQ(42, result[0]);
1105 EXPECT_EQ(43, result[1]);
1108 TEST(FiberManager, remoteHasTasks) {
1110 FiberManager fm(folly::make_unique<SimpleLoopController>());
1111 std::thread remote([&]() { fm.addTaskRemote([&]() { ++counter; }); });
1115 while (fm.hasTasks()) {
1116 fm.loopUntilNoReady();
1119 EXPECT_FALSE(fm.hasTasks());
1120 EXPECT_EQ(counter, 1);
1123 TEST(FiberManager, remoteHasReadyTasks) {
1125 folly::Optional<Promise<int>> savedPromise;
1126 FiberManager fm(folly::make_unique<SimpleLoopController>());
1127 std::thread remote([&]() {
1128 fm.addTaskRemote([&]() {
1130 [&](Promise<int> promise) { savedPromise = std::move(promise); });
1131 EXPECT_TRUE(fm.hasTasks());
1136 EXPECT_TRUE(fm.hasTasks());
1138 fm.loopUntilNoReady();
1139 EXPECT_TRUE(fm.hasTasks());
1141 std::thread remote2([&]() { savedPromise->setValue(47); });
1143 EXPECT_TRUE(fm.hasTasks());
1145 fm.loopUntilNoReady();
1146 EXPECT_FALSE(fm.hasTasks());
1148 EXPECT_EQ(result, 47);
1151 template <typename Data>
1152 void testFiberLocal() {
1154 LocalType<Data>(), folly::make_unique<SimpleLoopController>());
1157 EXPECT_EQ(42, local<Data>().value);
1159 local<Data>().value = 43;
1162 EXPECT_EQ(43, local<Data>().value);
1164 local<Data>().value = 44;
1166 addTask([]() { EXPECT_EQ(44, local<Data>().value); });
1171 EXPECT_EQ(42, local<Data>().value);
1173 local<Data>().value = 43;
1175 fm.addTaskRemote([]() { EXPECT_EQ(43, local<Data>().value); });
1179 EXPECT_EQ(42, local<Data>().value);
1180 local<Data>().value = 43;
1183 EXPECT_EQ(43, local<Data>().value);
1184 local<Data>().value = 44;
1186 std::vector<std::function<void()>> tasks{task};
1187 collectAny(tasks.begin(), tasks.end());
1189 EXPECT_EQ(43, local<Data>().value);
1192 fm.loopUntilNoReady();
1193 EXPECT_FALSE(fm.hasTasks());
1196 TEST(FiberManager, fiberLocal) {
1201 testFiberLocal<SimpleData>();
1204 TEST(FiberManager, fiberLocalHeap) {
1206 char _[1024 * 1024];
1210 testFiberLocal<LargeData>();
1213 TEST(FiberManager, fiberLocalDestructor) {
1220 EXPECT_EQ(42, local<CrazyData>().data);
1221 // Make sure we don't have infinite loop
1222 local<CrazyData>().data = 0;
1229 LocalType<CrazyData>(), folly::make_unique<SimpleLoopController>());
1231 fm.addTask([]() { local<CrazyData>().data = 41; });
1233 fm.loopUntilNoReady();
1234 EXPECT_FALSE(fm.hasTasks());
1237 TEST(FiberManager, yieldTest) {
1238 FiberManager manager(folly::make_unique<SimpleLoopController>());
1239 auto& loopController =
1240 dynamic_cast<SimpleLoopController&>(manager.loopController());
1242 bool checkRan = false;
1244 manager.addTask([&]() {
1249 loopController.loop([&]() {
1251 loopController.stop();
1255 EXPECT_TRUE(checkRan);
1258 TEST(FiberManager, RequestContext) {
1259 FiberManager fm(folly::make_unique<SimpleLoopController>());
1261 bool checkRun1 = false;
1262 bool checkRun2 = false;
1263 bool checkRun3 = false;
1264 bool checkRun4 = false;
1265 folly::fibers::Baton baton1;
1266 folly::fibers::Baton baton2;
1267 folly::fibers::Baton baton3;
1268 folly::fibers::Baton baton4;
1270 folly::RequestContext::create();
1271 auto rcontext1 = folly::RequestContext::get();
1273 EXPECT_EQ(rcontext1, folly::RequestContext::get());
1274 baton1.wait([&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1275 EXPECT_EQ(rcontext1, folly::RequestContext::get());
1277 [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1281 folly::RequestContext::create();
1282 auto rcontext2 = folly::RequestContext::get();
1283 fm.addTaskRemote([&]() {
1284 EXPECT_EQ(rcontext2, folly::RequestContext::get());
1286 EXPECT_EQ(rcontext2, folly::RequestContext::get());
1290 folly::RequestContext::create();
1291 auto rcontext3 = folly::RequestContext::get();
1294 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1296 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1298 return folly::Unit();
1300 [&](Try<folly::Unit>&& /* t */) {
1301 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1305 folly::RequestContext::setContext(nullptr);
1307 folly::RequestContext::create();
1308 auto rcontext4 = folly::RequestContext::get();
1310 EXPECT_EQ(rcontext4, folly::RequestContext::get());
1314 folly::RequestContext::create();
1315 auto rcontext = folly::RequestContext::get();
1317 fm.loopUntilNoReady();
1318 EXPECT_EQ(rcontext, folly::RequestContext::get());
1321 EXPECT_EQ(rcontext, folly::RequestContext::get());
1322 fm.loopUntilNoReady();
1323 EXPECT_TRUE(checkRun1);
1324 EXPECT_EQ(rcontext, folly::RequestContext::get());
1327 EXPECT_EQ(rcontext, folly::RequestContext::get());
1328 fm.loopUntilNoReady();
1329 EXPECT_TRUE(checkRun2);
1330 EXPECT_EQ(rcontext, folly::RequestContext::get());
1333 EXPECT_EQ(rcontext, folly::RequestContext::get());
1334 fm.loopUntilNoReady();
1335 EXPECT_TRUE(checkRun3);
1336 EXPECT_EQ(rcontext, folly::RequestContext::get());
1339 EXPECT_EQ(rcontext, folly::RequestContext::get());
1340 fm.loopUntilNoReady();
1341 EXPECT_TRUE(checkRun4);
1342 EXPECT_EQ(rcontext, folly::RequestContext::get());
1345 TEST(FiberManager, resizePeriodically) {
1346 FiberManager::Options opts;
1347 opts.fibersPoolResizePeriodMs = 300;
1348 opts.maxFibersPoolSize = 5;
1350 FiberManager manager(folly::make_unique<EventBaseLoopController>(), opts);
1352 folly::EventBase evb;
1353 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1354 .attachEventBase(evb);
1356 std::vector<Baton> batons(10);
1358 size_t tasksRun = 0;
1359 for (size_t i = 0; i < 30; ++i) {
1360 manager.addTask([i, &batons, &tasksRun]() {
1362 // Keep some fibers active indefinitely
1363 if (i < batons.size()) {
1369 EXPECT_EQ(0, tasksRun);
1370 EXPECT_EQ(30, manager.fibersAllocated());
1371 EXPECT_EQ(0, manager.fibersPoolSize());
1374 EXPECT_EQ(30, tasksRun);
1375 EXPECT_EQ(30, manager.fibersAllocated());
1376 // Can go over maxFibersPoolSize, 10 of 30 fibers still active
1377 EXPECT_EQ(20, manager.fibersPoolSize());
1379 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1380 evb.loopOnce(); // no fibers active in this period
1381 EXPECT_EQ(30, manager.fibersAllocated());
1382 EXPECT_EQ(20, manager.fibersPoolSize());
1384 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1385 evb.loopOnce(); // should shrink fibers pool to maxFibersPoolSize
1386 EXPECT_EQ(15, manager.fibersAllocated());
1387 EXPECT_EQ(5, manager.fibersPoolSize());
1389 for (size_t i = 0; i < batons.size(); ++i) {
1393 EXPECT_EQ(15, manager.fibersAllocated());
1394 EXPECT_EQ(15, manager.fibersPoolSize());
1396 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1397 evb.loopOnce(); // 10 fibers active in last period
1398 EXPECT_EQ(10, manager.fibersAllocated());
1399 EXPECT_EQ(10, manager.fibersPoolSize());
1401 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1403 EXPECT_EQ(5, manager.fibersAllocated());
1404 EXPECT_EQ(5, manager.fibersPoolSize());
1407 TEST(FiberManager, batonWaitTimeoutHandler) {
1408 FiberManager manager(folly::make_unique<EventBaseLoopController>());
1410 folly::EventBase evb;
1411 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1412 .attachEventBase(evb);
1414 size_t fibersRun = 0;
1416 Baton::TimeoutHandler timeoutHandler;
1418 manager.addTask([&]() {
1419 baton.wait(timeoutHandler);
1422 manager.loopUntilNoReady();
1424 EXPECT_FALSE(baton.try_wait());
1425 EXPECT_EQ(0, fibersRun);
1427 timeoutHandler.scheduleTimeout(std::chrono::milliseconds(250));
1428 std::this_thread::sleep_for(std::chrono::milliseconds(500));
1430 EXPECT_FALSE(baton.try_wait());
1431 EXPECT_EQ(0, fibersRun);
1434 manager.loopUntilNoReady();
1436 EXPECT_EQ(1, fibersRun);
1439 TEST(FiberManager, batonWaitTimeoutMany) {
1440 FiberManager manager(folly::make_unique<EventBaseLoopController>());
1442 folly::EventBase evb;
1443 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1444 .attachEventBase(evb);
1446 constexpr size_t kNumTimeoutTasks = 10000;
1447 size_t tasksCount = kNumTimeoutTasks;
1449 // We add many tasks to hit timeout queue deallocation logic.
1450 for (size_t i = 0; i < kNumTimeoutTasks; ++i) {
1451 manager.addTask([&]() {
1453 Baton::TimeoutHandler timeoutHandler;
1455 folly::fibers::addTask([&] {
1456 timeoutHandler.scheduleTimeout(std::chrono::milliseconds(1000));
1459 baton.wait(timeoutHandler);
1460 if (--tasksCount == 0) {
1461 evb.terminateLoopSoon();
1469 TEST(FiberManager, remoteFutureTest) {
1470 FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
1471 auto& loopController =
1472 dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1476 auto f1 = fiberManager.addTaskFuture([&]() { return testValue1; });
1477 auto f2 = fiberManager.addTaskRemoteFuture([&]() { return testValue2; });
1478 loopController.loop([&]() { loopController.stop(); });
1482 EXPECT_EQ(v1, testValue1);
1483 EXPECT_EQ(v2, testValue2);
1486 // Test that a void function produes a Future<Unit>.
1487 TEST(FiberManager, remoteFutureVoidUnitTest) {
1488 FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
1489 auto& loopController =
1490 dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1492 bool ranLocal = false;
1493 folly::Future<folly::Unit> futureLocal =
1494 fiberManager.addTaskFuture([&]() { ranLocal = true; });
1496 bool ranRemote = false;
1497 folly::Future<folly::Unit> futureRemote =
1498 fiberManager.addTaskRemoteFuture([&]() { ranRemote = true; });
1500 loopController.loop([&]() { loopController.stop(); });
1503 ASSERT_TRUE(ranLocal);
1505 futureRemote.wait();
1506 ASSERT_TRUE(ranRemote);
1509 TEST(FiberManager, nestedFiberManagers) {
1510 folly::EventBase outerEvb;
1511 folly::EventBase innerEvb;
1513 getFiberManager(outerEvb).addTask([&]() {
1515 &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1517 runInMainContext([&]() {
1518 getFiberManager(innerEvb).addTask([&]() {
1520 &getFiberManager(innerEvb), FiberManager::getFiberManagerUnsafe());
1522 innerEvb.terminateLoopSoon();
1525 innerEvb.loopForever();
1529 &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1531 outerEvb.terminateLoopSoon();
1534 outerEvb.loopForever();
1537 static size_t sNumAwaits;
1539 void runBenchmark(size_t numAwaits, size_t toSend) {
1540 sNumAwaits = numAwaits;
1542 FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
1543 auto& loopController =
1544 dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1546 std::queue<Promise<int>> pendingRequests;
1547 static const size_t maxOutstanding = 5;
1549 auto loop = [&fiberManager, &loopController, &pendingRequests, &toSend]() {
1550 if (pendingRequests.size() == maxOutstanding || toSend == 0) {
1551 if (pendingRequests.empty()) {
1554 pendingRequests.front().setValue(0);
1555 pendingRequests.pop();
1557 fiberManager.addTask([&pendingRequests]() {
1558 for (size_t i = 0; i < sNumAwaits; ++i) {
1559 auto result = await([&pendingRequests](Promise<int> promise) {
1560 pendingRequests.push(std::move(promise));
1562 DCHECK_EQ(result, 0);
1566 if (--toSend == 0) {
1567 loopController.stop();
1572 loopController.loop(std::move(loop));
1575 BENCHMARK(FiberManagerBasicOneAwait, iters) {
1576 runBenchmark(1, iters);
1579 BENCHMARK(FiberManagerBasicFiveAwaits, iters) {
1580 runBenchmark(5, iters);
1583 BENCHMARK(FiberManagerCreateDestroy, iters) {
1584 for (size_t i = 0; i < iters; ++i) {
1585 folly::EventBase evb;
1586 auto& fm = folly::fibers::getFiberManager(evb);
1587 fm.addTask([]() {});
1592 BENCHMARK(FiberManagerAllocateDeallocatePattern, iters) {
1593 static const size_t kNumAllocations = 10000;
1595 FiberManager::Options opts;
1596 opts.maxFibersPoolSize = 0;
1598 FiberManager fiberManager(folly::make_unique<SimpleLoopController>(), opts);
1600 for (size_t iter = 0; iter < iters; ++iter) {
1601 EXPECT_EQ(0, fiberManager.fibersPoolSize());
1603 size_t fibersRun = 0;
1605 for (size_t i = 0; i < kNumAllocations; ++i) {
1606 fiberManager.addTask([&fibersRun] { ++fibersRun; });
1607 fiberManager.loopUntilNoReady();
1610 EXPECT_EQ(10000, fibersRun);
1611 EXPECT_EQ(0, fiberManager.fibersPoolSize());
1615 BENCHMARK(FiberManagerAllocateLargeChunk, iters) {
1616 static const size_t kNumAllocations = 10000;
1618 FiberManager::Options opts;
1619 opts.maxFibersPoolSize = 0;
1621 FiberManager fiberManager(folly::make_unique<SimpleLoopController>(), opts);
1623 for (size_t iter = 0; iter < iters; ++iter) {
1624 EXPECT_EQ(0, fiberManager.fibersPoolSize());
1626 size_t fibersRun = 0;
1628 for (size_t i = 0; i < kNumAllocations; ++i) {
1629 fiberManager.addTask([&fibersRun] { ++fibersRun; });
1632 fiberManager.loopUntilNoReady();
1634 EXPECT_EQ(10000, fibersRun);
1635 EXPECT_EQ(0, fiberManager.fibersPoolSize());