2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <gtest/gtest.h>
22 #include <folly/Memory.h>
23 #include <folly/futures/Future.h>
25 #include <folly/fibers/AddTasks.h>
26 #include <folly/fibers/EventBaseLoopController.h>
27 #include <folly/fibers/FiberManager.h>
28 #include <folly/fibers/FiberManagerMap.h>
29 #include <folly/fibers/GenericBaton.h>
30 #include <folly/fibers/SimpleLoopController.h>
31 #include <folly/fibers/WhenN.h>
33 using namespace folly::fibers;
37 TEST(FiberManager, batonTimedWaitTimeout) {
38 bool taskAdded = false;
39 size_t iterations = 0;
41 FiberManager manager(folly::make_unique<SimpleLoopController>());
42 auto& loopController =
43 dynamic_cast<SimpleLoopController&>(manager.loopController());
45 auto loopFunc = [&]() {
47 manager.addTask([&]() {
50 auto res = baton.timed_wait(std::chrono::milliseconds(230));
53 EXPECT_EQ(5, iterations);
55 loopController.stop();
57 manager.addTask([&]() {
60 auto res = baton.timed_wait(std::chrono::milliseconds(130));
63 EXPECT_EQ(3, iterations);
65 loopController.stop();
69 std::this_thread::sleep_for(std::chrono::milliseconds(50));
74 loopController.loop(std::move(loopFunc));
77 TEST(FiberManager, batonTimedWaitPost) {
78 bool taskAdded = false;
79 size_t iterations = 0;
82 FiberManager manager(folly::make_unique<SimpleLoopController>());
83 auto& loopController =
84 dynamic_cast<SimpleLoopController&>(manager.loopController());
86 auto loopFunc = [&]() {
88 manager.addTask([&]() {
92 auto res = baton.timed_wait(std::chrono::milliseconds(130));
95 EXPECT_EQ(2, iterations);
97 loopController.stop();
101 std::this_thread::sleep_for(std::chrono::milliseconds(50));
103 if (iterations == 2) {
109 loopController.loop(std::move(loopFunc));
112 TEST(FiberManager, batonTimedWaitTimeoutEvb) {
113 size_t tasksComplete = 0;
115 folly::EventBase evb;
117 FiberManager manager(folly::make_unique<EventBaseLoopController>());
118 dynamic_cast<EventBaseLoopController&>(manager.loopController())
119 .attachEventBase(evb);
121 auto task = [&](size_t timeout_ms) {
124 auto start = EventBaseLoopController::Clock::now();
125 auto res = baton.timed_wait(std::chrono::milliseconds(timeout_ms));
126 auto finish = EventBaseLoopController::Clock::now();
131 std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
133 EXPECT_GT(duration_ms.count(), timeout_ms - 50);
134 EXPECT_LT(duration_ms.count(), timeout_ms + 50);
136 if (++tasksComplete == 2) {
137 evb.terminateLoopSoon();
141 evb.runInEventBaseThread([&]() {
142 manager.addTask([&]() { task(500); });
143 manager.addTask([&]() { task(250); });
148 EXPECT_EQ(2, tasksComplete);
151 TEST(FiberManager, batonTimedWaitPostEvb) {
152 size_t tasksComplete = 0;
154 folly::EventBase evb;
156 FiberManager manager(folly::make_unique<EventBaseLoopController>());
157 dynamic_cast<EventBaseLoopController&>(manager.loopController())
158 .attachEventBase(evb);
160 evb.runInEventBaseThread([&]() {
161 manager.addTask([&]() {
164 evb.tryRunAfterDelay([&]() { baton.post(); }, 100);
166 auto start = EventBaseLoopController::Clock::now();
167 auto res = baton.timed_wait(std::chrono::milliseconds(130));
168 auto finish = EventBaseLoopController::Clock::now();
173 std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
175 EXPECT_TRUE(duration_ms.count() > 95 && duration_ms.count() < 110);
177 if (++tasksComplete == 1) {
178 evb.terminateLoopSoon();
185 EXPECT_EQ(1, tasksComplete);
188 TEST(FiberManager, batonTryWait) {
189 FiberManager manager(folly::make_unique<SimpleLoopController>());
191 // Check if try_wait and post work as expected
194 manager.addTask([&]() {
195 while (!b.try_wait()) {
198 auto thr = std::thread([&]() {
199 std::this_thread::sleep_for(std::chrono::milliseconds(300));
203 manager.loopUntilNoReady();
208 // Check try_wait without post
209 manager.addTask([&]() {
211 while (cnt && !c.try_wait()) {
214 EXPECT_TRUE(!c.try_wait()); // must still hold
218 manager.loopUntilNoReady();
221 TEST(FiberManager, genericBatonFiberWait) {
222 FiberManager manager(folly::make_unique<SimpleLoopController>());
225 bool fiberRunning = false;
227 manager.addTask([&]() {
228 EXPECT_EQ(manager.hasActiveFiber(), true);
231 fiberRunning = false;
234 EXPECT_FALSE(fiberRunning);
235 manager.loopUntilNoReady();
236 EXPECT_TRUE(fiberRunning); // ensure fiber still active
238 auto thr = std::thread([&]() {
239 std::this_thread::sleep_for(std::chrono::milliseconds(300));
243 while (fiberRunning) {
244 manager.loopUntilNoReady();
250 TEST(FiberManager, genericBatonThreadWait) {
251 FiberManager manager(folly::make_unique<SimpleLoopController>());
253 std::atomic<bool> threadWaiting(false);
255 auto thr = std::thread([&]() {
256 threadWaiting = true;
258 threadWaiting = false;
261 while (!threadWaiting) {
263 std::this_thread::sleep_for(std::chrono::milliseconds(300));
265 manager.addTask([&]() {
266 EXPECT_EQ(manager.hasActiveFiber(), true);
267 EXPECT_TRUE(threadWaiting);
269 while (threadWaiting) {
273 manager.loopUntilNoReady();
277 TEST(FiberManager, addTasksNoncopyable) {
278 std::vector<Promise<int>> pendingFibers;
279 bool taskAdded = false;
281 FiberManager manager(folly::make_unique<SimpleLoopController>());
282 auto& loopController =
283 dynamic_cast<SimpleLoopController&>(manager.loopController());
285 auto loopFunc = [&]() {
287 manager.addTask([&]() {
288 std::vector<std::function<std::unique_ptr<int>()>> funcs;
289 for (size_t i = 0; i < 3; ++i) {
290 funcs.push_back([i, &pendingFibers]() {
291 await([&pendingFibers](Promise<int> promise) {
292 pendingFibers.push_back(std::move(promise));
294 return folly::make_unique<int>(i * 2 + 1);
298 auto iter = addTasks(funcs.begin(), funcs.end());
301 while (iter.hasNext()) {
302 auto result = iter.awaitNext();
303 EXPECT_EQ(2 * iter.getTaskID() + 1, *result);
304 EXPECT_GE(2 - n, pendingFibers.size());
310 } else if (pendingFibers.size()) {
311 pendingFibers.back().setValue(0);
312 pendingFibers.pop_back();
314 loopController.stop();
318 loopController.loop(std::move(loopFunc));
321 TEST(FiberManager, awaitThrow) {
322 folly::EventBase evb;
323 struct ExpectedException {};
327 await([](Promise<int> p) {
329 throw ExpectedException();
335 await([&](Promise<int> p) {
336 evb.runInEventBaseThread([p = std::move(p)]() mutable {
339 throw ExpectedException();
346 TEST(FiberManager, addTasksThrow) {
347 std::vector<Promise<int>> pendingFibers;
348 bool taskAdded = false;
350 FiberManager manager(folly::make_unique<SimpleLoopController>());
351 auto& loopController =
352 dynamic_cast<SimpleLoopController&>(manager.loopController());
354 auto loopFunc = [&]() {
356 manager.addTask([&]() {
357 std::vector<std::function<int()>> funcs;
358 for (size_t i = 0; i < 3; ++i) {
359 funcs.push_back([i, &pendingFibers]() {
360 await([&pendingFibers](Promise<int> promise) {
361 pendingFibers.push_back(std::move(promise));
364 throw std::runtime_error("Runtime");
370 auto iter = addTasks(funcs.begin(), funcs.end());
373 while (iter.hasNext()) {
375 int result = iter.awaitNext();
376 EXPECT_EQ(1, iter.getTaskID() % 2);
377 EXPECT_EQ(2 * iter.getTaskID() + 1, result);
379 EXPECT_EQ(0, iter.getTaskID() % 2);
381 EXPECT_GE(2 - n, pendingFibers.size());
387 } else if (pendingFibers.size()) {
388 pendingFibers.back().setValue(0);
389 pendingFibers.pop_back();
391 loopController.stop();
395 loopController.loop(std::move(loopFunc));
398 TEST(FiberManager, addTasksVoid) {
399 std::vector<Promise<int>> pendingFibers;
400 bool taskAdded = false;
402 FiberManager manager(folly::make_unique<SimpleLoopController>());
403 auto& loopController =
404 dynamic_cast<SimpleLoopController&>(manager.loopController());
406 auto loopFunc = [&]() {
408 manager.addTask([&]() {
409 std::vector<std::function<void()>> funcs;
410 for (size_t i = 0; i < 3; ++i) {
411 funcs.push_back([i, &pendingFibers]() {
412 await([&pendingFibers](Promise<int> promise) {
413 pendingFibers.push_back(std::move(promise));
418 auto iter = addTasks(funcs.begin(), funcs.end());
421 while (iter.hasNext()) {
423 EXPECT_GE(2 - n, pendingFibers.size());
429 } else if (pendingFibers.size()) {
430 pendingFibers.back().setValue(0);
431 pendingFibers.pop_back();
433 loopController.stop();
437 loopController.loop(std::move(loopFunc));
440 TEST(FiberManager, addTasksVoidThrow) {
441 std::vector<Promise<int>> pendingFibers;
442 bool taskAdded = false;
444 FiberManager manager(folly::make_unique<SimpleLoopController>());
445 auto& loopController =
446 dynamic_cast<SimpleLoopController&>(manager.loopController());
448 auto loopFunc = [&]() {
450 manager.addTask([&]() {
451 std::vector<std::function<void()>> funcs;
452 for (size_t i = 0; i < 3; ++i) {
453 funcs.push_back([i, &pendingFibers]() {
454 await([&pendingFibers](Promise<int> promise) {
455 pendingFibers.push_back(std::move(promise));
458 throw std::runtime_error("");
463 auto iter = addTasks(funcs.begin(), funcs.end());
466 while (iter.hasNext()) {
469 EXPECT_EQ(1, iter.getTaskID() % 2);
471 EXPECT_EQ(0, iter.getTaskID() % 2);
473 EXPECT_GE(2 - n, pendingFibers.size());
479 } else if (pendingFibers.size()) {
480 pendingFibers.back().setValue(0);
481 pendingFibers.pop_back();
483 loopController.stop();
487 loopController.loop(std::move(loopFunc));
490 TEST(FiberManager, addTasksReserve) {
491 std::vector<Promise<int>> pendingFibers;
492 bool taskAdded = false;
494 FiberManager manager(folly::make_unique<SimpleLoopController>());
495 auto& loopController =
496 dynamic_cast<SimpleLoopController&>(manager.loopController());
498 auto loopFunc = [&]() {
500 manager.addTask([&]() {
501 std::vector<std::function<void()>> funcs;
502 for (size_t i = 0; i < 3; ++i) {
503 funcs.push_back([&pendingFibers]() {
504 await([&pendingFibers](Promise<int> promise) {
505 pendingFibers.push_back(std::move(promise));
510 auto iter = addTasks(funcs.begin(), funcs.end());
513 EXPECT_TRUE(iter.hasCompleted());
514 EXPECT_TRUE(iter.hasPending());
515 EXPECT_TRUE(iter.hasNext());
518 EXPECT_TRUE(iter.hasCompleted());
519 EXPECT_TRUE(iter.hasPending());
520 EXPECT_TRUE(iter.hasNext());
523 EXPECT_FALSE(iter.hasCompleted());
524 EXPECT_TRUE(iter.hasPending());
525 EXPECT_TRUE(iter.hasNext());
528 EXPECT_FALSE(iter.hasCompleted());
529 EXPECT_FALSE(iter.hasPending());
530 EXPECT_FALSE(iter.hasNext());
533 } else if (pendingFibers.size()) {
534 pendingFibers.back().setValue(0);
535 pendingFibers.pop_back();
537 loopController.stop();
541 loopController.loop(std::move(loopFunc));
544 TEST(FiberManager, addTaskDynamic) {
545 folly::EventBase evb;
549 auto makeTask = [&](size_t taskId) {
550 return [&, taskId]() -> size_t {
551 batons[taskId].wait();
557 .addTaskFuture([&]() {
558 TaskIterator<size_t> iterator;
560 iterator.addTask(makeTask(0));
561 iterator.addTask(makeTask(1));
565 EXPECT_EQ(1, iterator.awaitNext());
567 iterator.addTask(makeTask(2));
571 EXPECT_EQ(2, iterator.awaitNext());
575 EXPECT_EQ(0, iterator.awaitNext());
580 TEST(FiberManager, forEach) {
581 std::vector<Promise<int>> pendingFibers;
582 bool taskAdded = false;
584 FiberManager manager(folly::make_unique<SimpleLoopController>());
585 auto& loopController =
586 dynamic_cast<SimpleLoopController&>(manager.loopController());
588 auto loopFunc = [&]() {
590 manager.addTask([&]() {
591 std::vector<std::function<int()>> funcs;
592 for (size_t i = 0; i < 3; ++i) {
593 funcs.push_back([i, &pendingFibers]() {
594 await([&pendingFibers](Promise<int> promise) {
595 pendingFibers.push_back(std::move(promise));
601 std::vector<std::pair<size_t, int>> results;
602 forEach(funcs.begin(), funcs.end(), [&results](size_t id, int result) {
603 results.emplace_back(id, result);
605 EXPECT_EQ(3, results.size());
606 EXPECT_TRUE(pendingFibers.empty());
607 for (size_t i = 0; i < 3; ++i) {
608 EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
612 } else if (pendingFibers.size()) {
613 pendingFibers.back().setValue(0);
614 pendingFibers.pop_back();
616 loopController.stop();
620 loopController.loop(std::move(loopFunc));
623 TEST(FiberManager, collectN) {
624 std::vector<Promise<int>> pendingFibers;
625 bool taskAdded = false;
627 FiberManager manager(folly::make_unique<SimpleLoopController>());
628 auto& loopController =
629 dynamic_cast<SimpleLoopController&>(manager.loopController());
631 auto loopFunc = [&]() {
633 manager.addTask([&]() {
634 std::vector<std::function<int()>> funcs;
635 for (size_t i = 0; i < 3; ++i) {
636 funcs.push_back([i, &pendingFibers]() {
637 await([&pendingFibers](Promise<int> promise) {
638 pendingFibers.push_back(std::move(promise));
644 auto results = collectN(funcs.begin(), funcs.end(), 2);
645 EXPECT_EQ(2, results.size());
646 EXPECT_EQ(1, pendingFibers.size());
647 for (size_t i = 0; i < 2; ++i) {
648 EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
652 } else if (pendingFibers.size()) {
653 pendingFibers.back().setValue(0);
654 pendingFibers.pop_back();
656 loopController.stop();
660 loopController.loop(std::move(loopFunc));
663 TEST(FiberManager, collectNThrow) {
664 std::vector<Promise<int>> pendingFibers;
665 bool taskAdded = false;
667 FiberManager manager(folly::make_unique<SimpleLoopController>());
668 auto& loopController =
669 dynamic_cast<SimpleLoopController&>(manager.loopController());
671 auto loopFunc = [&]() {
673 manager.addTask([&]() {
674 std::vector<std::function<int()>> funcs;
675 for (size_t i = 0; i < 3; ++i) {
676 funcs.push_back([i, &pendingFibers]() {
677 await([&pendingFibers](Promise<int> promise) {
678 pendingFibers.push_back(std::move(promise));
680 throw std::runtime_error("Runtime");
686 collectN(funcs.begin(), funcs.end(), 2);
688 EXPECT_EQ(1, pendingFibers.size());
692 } else if (pendingFibers.size()) {
693 pendingFibers.back().setValue(0);
694 pendingFibers.pop_back();
696 loopController.stop();
700 loopController.loop(std::move(loopFunc));
703 TEST(FiberManager, collectNVoid) {
704 std::vector<Promise<int>> pendingFibers;
705 bool taskAdded = false;
707 FiberManager manager(folly::make_unique<SimpleLoopController>());
708 auto& loopController =
709 dynamic_cast<SimpleLoopController&>(manager.loopController());
711 auto loopFunc = [&]() {
713 manager.addTask([&]() {
714 std::vector<std::function<void()>> funcs;
715 for (size_t i = 0; i < 3; ++i) {
716 funcs.push_back([i, &pendingFibers]() {
717 await([&pendingFibers](Promise<int> promise) {
718 pendingFibers.push_back(std::move(promise));
723 auto results = collectN(funcs.begin(), funcs.end(), 2);
724 EXPECT_EQ(2, results.size());
725 EXPECT_EQ(1, pendingFibers.size());
728 } else if (pendingFibers.size()) {
729 pendingFibers.back().setValue(0);
730 pendingFibers.pop_back();
732 loopController.stop();
736 loopController.loop(std::move(loopFunc));
739 TEST(FiberManager, collectNVoidThrow) {
740 std::vector<Promise<int>> pendingFibers;
741 bool taskAdded = false;
743 FiberManager manager(folly::make_unique<SimpleLoopController>());
744 auto& loopController =
745 dynamic_cast<SimpleLoopController&>(manager.loopController());
747 auto loopFunc = [&]() {
749 manager.addTask([&]() {
750 std::vector<std::function<void()>> funcs;
751 for (size_t i = 0; i < 3; ++i) {
752 funcs.push_back([i, &pendingFibers]() {
753 await([&pendingFibers](Promise<int> promise) {
754 pendingFibers.push_back(std::move(promise));
756 throw std::runtime_error("Runtime");
761 collectN(funcs.begin(), funcs.end(), 2);
763 EXPECT_EQ(1, pendingFibers.size());
767 } else if (pendingFibers.size()) {
768 pendingFibers.back().setValue(0);
769 pendingFibers.pop_back();
771 loopController.stop();
775 loopController.loop(std::move(loopFunc));
778 TEST(FiberManager, collectAll) {
779 std::vector<Promise<int>> pendingFibers;
780 bool taskAdded = false;
782 FiberManager manager(folly::make_unique<SimpleLoopController>());
783 auto& loopController =
784 dynamic_cast<SimpleLoopController&>(manager.loopController());
786 auto loopFunc = [&]() {
788 manager.addTask([&]() {
789 std::vector<std::function<int()>> funcs;
790 for (size_t i = 0; i < 3; ++i) {
791 funcs.push_back([i, &pendingFibers]() {
792 await([&pendingFibers](Promise<int> promise) {
793 pendingFibers.push_back(std::move(promise));
799 auto results = collectAll(funcs.begin(), funcs.end());
800 EXPECT_TRUE(pendingFibers.empty());
801 for (size_t i = 0; i < 3; ++i) {
802 EXPECT_EQ(i * 2 + 1, results[i]);
806 } else if (pendingFibers.size()) {
807 pendingFibers.back().setValue(0);
808 pendingFibers.pop_back();
810 loopController.stop();
814 loopController.loop(std::move(loopFunc));
817 TEST(FiberManager, collectAllVoid) {
818 std::vector<Promise<int>> pendingFibers;
819 bool taskAdded = false;
821 FiberManager manager(folly::make_unique<SimpleLoopController>());
822 auto& loopController =
823 dynamic_cast<SimpleLoopController&>(manager.loopController());
825 auto loopFunc = [&]() {
827 manager.addTask([&]() {
828 std::vector<std::function<void()>> funcs;
829 for (size_t i = 0; i < 3; ++i) {
830 funcs.push_back([i, &pendingFibers]() {
831 await([&pendingFibers](Promise<int> promise) {
832 pendingFibers.push_back(std::move(promise));
837 collectAll(funcs.begin(), funcs.end());
838 EXPECT_TRUE(pendingFibers.empty());
841 } else if (pendingFibers.size()) {
842 pendingFibers.back().setValue(0);
843 pendingFibers.pop_back();
845 loopController.stop();
849 loopController.loop(std::move(loopFunc));
852 TEST(FiberManager, collectAny) {
853 std::vector<Promise<int>> pendingFibers;
854 bool taskAdded = false;
856 FiberManager manager(folly::make_unique<SimpleLoopController>());
857 auto& loopController =
858 dynamic_cast<SimpleLoopController&>(manager.loopController());
860 auto loopFunc = [&]() {
862 manager.addTask([&]() {
863 std::vector<std::function<int()>> funcs;
864 for (size_t i = 0; i < 3; ++i) {
865 funcs.push_back([i, &pendingFibers]() {
866 await([&pendingFibers](Promise<int> promise) {
867 pendingFibers.push_back(std::move(promise));
870 throw std::runtime_error("This exception will be ignored");
876 auto result = collectAny(funcs.begin(), funcs.end());
877 EXPECT_EQ(2, pendingFibers.size());
878 EXPECT_EQ(2, result.first);
879 EXPECT_EQ(2 * 2 + 1, result.second);
882 } else if (pendingFibers.size()) {
883 pendingFibers.back().setValue(0);
884 pendingFibers.pop_back();
886 loopController.stop();
890 loopController.loop(std::move(loopFunc));
894 /* Checks that this function was run from a main context,
895 by comparing an address on a stack to a known main stack address
896 and a known related fiber stack address. The assumption
897 is that fiber stack and main stack will be far enough apart,
898 while any two values on the same stack will be close. */
899 void expectMainContext(bool& ran, int* mainLocation, int* fiberLocation) {
901 /* 2 pages is a good guess */
902 constexpr ssize_t DISTANCE = 0x2000 / sizeof(int);
904 EXPECT_TRUE(std::abs(&here - fiberLocation) > DISTANCE);
907 EXPECT_TRUE(std::abs(&here - mainLocation) < DISTANCE);
915 TEST(FiberManager, runInMainContext) {
916 FiberManager manager(folly::make_unique<SimpleLoopController>());
917 auto& loopController =
918 dynamic_cast<SimpleLoopController&>(manager.loopController());
920 bool checkRan = false;
923 manager.runInMainContext(
924 [&]() { expectMainContext(checkRan, &mainLocation, nullptr); });
925 EXPECT_TRUE(checkRan);
929 manager.addTask([&]() {
931 explicit A(int value_) : value(value_) {}
932 A(const A&) = delete;
938 auto ret = runInMainContext([&]() {
939 expectMainContext(checkRan, &mainLocation, &stackLocation);
942 EXPECT_TRUE(checkRan);
943 EXPECT_EQ(42, ret.value);
946 loopController.loop([&]() { loopController.stop(); });
948 EXPECT_TRUE(checkRan);
951 TEST(FiberManager, addTaskFinally) {
952 FiberManager manager(folly::make_unique<SimpleLoopController>());
953 auto& loopController =
954 dynamic_cast<SimpleLoopController&>(manager.loopController());
956 bool checkRan = false;
960 manager.addTaskFinally(
961 [&]() { return 1234; },
962 [&](Try<int>&& result) {
963 EXPECT_EQ(result.value(), 1234);
965 expectMainContext(checkRan, &mainLocation, nullptr);
968 EXPECT_FALSE(checkRan);
970 loopController.loop([&]() { loopController.stop(); });
972 EXPECT_TRUE(checkRan);
975 TEST(FiberManager, fibersPoolWithinLimit) {
976 FiberManager::Options opts;
977 opts.maxFibersPoolSize = 5;
979 FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
980 auto& loopController =
981 dynamic_cast<SimpleLoopController&>(manager.loopController());
983 size_t fibersRun = 0;
985 for (size_t i = 0; i < 5; ++i) {
986 manager.addTask([&]() { ++fibersRun; });
988 loopController.loop([&]() { loopController.stop(); });
990 EXPECT_EQ(5, fibersRun);
991 EXPECT_EQ(5, manager.fibersAllocated());
992 EXPECT_EQ(5, manager.fibersPoolSize());
994 for (size_t i = 0; i < 5; ++i) {
995 manager.addTask([&]() { ++fibersRun; });
997 loopController.loop([&]() { loopController.stop(); });
999 EXPECT_EQ(10, fibersRun);
1000 EXPECT_EQ(5, manager.fibersAllocated());
1001 EXPECT_EQ(5, manager.fibersPoolSize());
1004 TEST(FiberManager, fibersPoolOverLimit) {
1005 FiberManager::Options opts;
1006 opts.maxFibersPoolSize = 5;
1008 FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
1009 auto& loopController =
1010 dynamic_cast<SimpleLoopController&>(manager.loopController());
1012 size_t fibersRun = 0;
1014 for (size_t i = 0; i < 10; ++i) {
1015 manager.addTask([&]() { ++fibersRun; });
1018 EXPECT_EQ(0, fibersRun);
1019 EXPECT_EQ(10, manager.fibersAllocated());
1020 EXPECT_EQ(0, manager.fibersPoolSize());
1022 loopController.loop([&]() { loopController.stop(); });
1024 EXPECT_EQ(10, fibersRun);
1025 EXPECT_EQ(5, manager.fibersAllocated());
1026 EXPECT_EQ(5, manager.fibersPoolSize());
1029 TEST(FiberManager, remoteFiberBasic) {
1030 FiberManager manager(folly::make_unique<SimpleLoopController>());
1031 auto& loopController =
1032 dynamic_cast<SimpleLoopController&>(manager.loopController());
1035 result[0] = result[1] = 0;
1036 folly::Optional<Promise<int>> savedPromise[2];
1037 manager.addTask([&]() {
1039 [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1041 manager.addTask([&]() {
1043 [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1046 manager.loopUntilNoReady();
1048 EXPECT_TRUE(savedPromise[0].hasValue());
1049 EXPECT_TRUE(savedPromise[1].hasValue());
1050 EXPECT_EQ(0, result[0]);
1051 EXPECT_EQ(0, result[1]);
1053 std::thread remoteThread0{[&]() { savedPromise[0]->setValue(42); }};
1054 std::thread remoteThread1{[&]() { savedPromise[1]->setValue(43); }};
1055 remoteThread0.join();
1056 remoteThread1.join();
1057 EXPECT_EQ(0, result[0]);
1058 EXPECT_EQ(0, result[1]);
1059 /* Should only have scheduled once */
1060 EXPECT_EQ(1, loopController.remoteScheduleCalled());
1062 manager.loopUntilNoReady();
1063 EXPECT_EQ(42, result[0]);
1064 EXPECT_EQ(43, result[1]);
1067 TEST(FiberManager, addTaskRemoteBasic) {
1068 FiberManager manager(folly::make_unique<SimpleLoopController>());
1071 result[0] = result[1] = 0;
1072 folly::Optional<Promise<int>> savedPromise[2];
1074 std::thread remoteThread0{[&]() {
1075 manager.addTaskRemote([&]() {
1077 [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1080 std::thread remoteThread1{[&]() {
1081 manager.addTaskRemote([&]() {
1083 [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1086 remoteThread0.join();
1087 remoteThread1.join();
1089 manager.loopUntilNoReady();
1091 EXPECT_TRUE(savedPromise[0].hasValue());
1092 EXPECT_TRUE(savedPromise[1].hasValue());
1093 EXPECT_EQ(0, result[0]);
1094 EXPECT_EQ(0, result[1]);
1096 savedPromise[0]->setValue(42);
1097 savedPromise[1]->setValue(43);
1099 EXPECT_EQ(0, result[0]);
1100 EXPECT_EQ(0, result[1]);
1102 manager.loopUntilNoReady();
1103 EXPECT_EQ(42, result[0]);
1104 EXPECT_EQ(43, result[1]);
1107 TEST(FiberManager, remoteHasTasks) {
1109 FiberManager fm(folly::make_unique<SimpleLoopController>());
1110 std::thread remote([&]() { fm.addTaskRemote([&]() { ++counter; }); });
1114 while (fm.hasTasks()) {
1115 fm.loopUntilNoReady();
1118 EXPECT_FALSE(fm.hasTasks());
1119 EXPECT_EQ(counter, 1);
1122 TEST(FiberManager, remoteHasReadyTasks) {
1124 folly::Optional<Promise<int>> savedPromise;
1125 FiberManager fm(folly::make_unique<SimpleLoopController>());
1126 std::thread remote([&]() {
1127 fm.addTaskRemote([&]() {
1129 [&](Promise<int> promise) { savedPromise = std::move(promise); });
1130 EXPECT_TRUE(fm.hasTasks());
1135 EXPECT_TRUE(fm.hasTasks());
1137 fm.loopUntilNoReady();
1138 EXPECT_TRUE(fm.hasTasks());
1140 std::thread remote2([&]() { savedPromise->setValue(47); });
1142 EXPECT_TRUE(fm.hasTasks());
1144 fm.loopUntilNoReady();
1145 EXPECT_FALSE(fm.hasTasks());
1147 EXPECT_EQ(result, 47);
1150 template <typename Data>
1151 void testFiberLocal() {
1153 LocalType<Data>(), folly::make_unique<SimpleLoopController>());
1156 EXPECT_EQ(42, local<Data>().value);
1158 local<Data>().value = 43;
1161 EXPECT_EQ(43, local<Data>().value);
1163 local<Data>().value = 44;
1165 addTask([]() { EXPECT_EQ(44, local<Data>().value); });
1170 EXPECT_EQ(42, local<Data>().value);
1172 local<Data>().value = 43;
1174 fm.addTaskRemote([]() { EXPECT_EQ(43, local<Data>().value); });
1178 EXPECT_EQ(42, local<Data>().value);
1179 local<Data>().value = 43;
1182 EXPECT_EQ(43, local<Data>().value);
1183 local<Data>().value = 44;
1185 std::vector<std::function<void()>> tasks{task};
1186 collectAny(tasks.begin(), tasks.end());
1188 EXPECT_EQ(43, local<Data>().value);
1191 fm.loopUntilNoReady();
1192 EXPECT_FALSE(fm.hasTasks());
1195 TEST(FiberManager, fiberLocal) {
1200 testFiberLocal<SimpleData>();
1203 TEST(FiberManager, fiberLocalHeap) {
1205 char _[1024 * 1024];
1209 testFiberLocal<LargeData>();
1212 TEST(FiberManager, fiberLocalDestructor) {
1219 EXPECT_EQ(42, local<CrazyData>().data);
1220 // Make sure we don't have infinite loop
1221 local<CrazyData>().data = 0;
1228 LocalType<CrazyData>(), folly::make_unique<SimpleLoopController>());
1230 fm.addTask([]() { local<CrazyData>().data = 41; });
1232 fm.loopUntilNoReady();
1233 EXPECT_FALSE(fm.hasTasks());
1236 TEST(FiberManager, yieldTest) {
1237 FiberManager manager(folly::make_unique<SimpleLoopController>());
1238 auto& loopController =
1239 dynamic_cast<SimpleLoopController&>(manager.loopController());
1241 bool checkRan = false;
1243 manager.addTask([&]() {
1248 loopController.loop([&]() {
1250 loopController.stop();
1254 EXPECT_TRUE(checkRan);
1257 TEST(FiberManager, RequestContext) {
1258 FiberManager fm(folly::make_unique<SimpleLoopController>());
1260 bool checkRun1 = false;
1261 bool checkRun2 = false;
1262 bool checkRun3 = false;
1263 bool checkRun4 = false;
1264 folly::fibers::Baton baton1;
1265 folly::fibers::Baton baton2;
1266 folly::fibers::Baton baton3;
1267 folly::fibers::Baton baton4;
1270 folly::RequestContextScopeGuard rctx;
1271 auto rcontext1 = folly::RequestContext::get();
1272 fm.addTask([&, rcontext1]() {
1273 EXPECT_EQ(rcontext1, folly::RequestContext::get());
1275 [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1276 EXPECT_EQ(rcontext1, folly::RequestContext::get());
1278 [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1283 folly::RequestContextScopeGuard rctx;
1284 auto rcontext2 = folly::RequestContext::get();
1285 fm.addTaskRemote([&, rcontext2]() {
1286 EXPECT_EQ(rcontext2, folly::RequestContext::get());
1288 EXPECT_EQ(rcontext2, folly::RequestContext::get());
1293 folly::RequestContextScopeGuard rctx;
1294 auto rcontext3 = folly::RequestContext::get();
1297 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1299 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1301 return folly::Unit();
1303 [&, rcontext3](Try<folly::Unit>&& /* t */) {
1304 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1309 folly::RequestContext::setContext(nullptr);
1311 folly::RequestContextScopeGuard rctx;
1312 auto rcontext4 = folly::RequestContext::get();
1314 EXPECT_EQ(rcontext4, folly::RequestContext::get());
1319 folly::RequestContextScopeGuard rctx;
1320 auto rcontext = folly::RequestContext::get();
1322 fm.loopUntilNoReady();
1323 EXPECT_EQ(rcontext, folly::RequestContext::get());
1326 EXPECT_EQ(rcontext, folly::RequestContext::get());
1327 fm.loopUntilNoReady();
1328 EXPECT_TRUE(checkRun1);
1329 EXPECT_EQ(rcontext, folly::RequestContext::get());
1332 EXPECT_EQ(rcontext, folly::RequestContext::get());
1333 fm.loopUntilNoReady();
1334 EXPECT_TRUE(checkRun2);
1335 EXPECT_EQ(rcontext, folly::RequestContext::get());
1338 EXPECT_EQ(rcontext, folly::RequestContext::get());
1339 fm.loopUntilNoReady();
1340 EXPECT_TRUE(checkRun3);
1341 EXPECT_EQ(rcontext, folly::RequestContext::get());
1344 EXPECT_EQ(rcontext, folly::RequestContext::get());
1345 fm.loopUntilNoReady();
1346 EXPECT_TRUE(checkRun4);
1347 EXPECT_EQ(rcontext, folly::RequestContext::get());
1351 TEST(FiberManager, resizePeriodically) {
1352 FiberManager::Options opts;
1353 opts.fibersPoolResizePeriodMs = 300;
1354 opts.maxFibersPoolSize = 5;
1356 FiberManager manager(folly::make_unique<EventBaseLoopController>(), opts);
1358 folly::EventBase evb;
1359 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1360 .attachEventBase(evb);
1362 std::vector<Baton> batons(10);
1364 size_t tasksRun = 0;
1365 for (size_t i = 0; i < 30; ++i) {
1366 manager.addTask([i, &batons, &tasksRun]() {
1368 // Keep some fibers active indefinitely
1369 if (i < batons.size()) {
1375 EXPECT_EQ(0, tasksRun);
1376 EXPECT_EQ(30, manager.fibersAllocated());
1377 EXPECT_EQ(0, manager.fibersPoolSize());
1380 EXPECT_EQ(30, tasksRun);
1381 EXPECT_EQ(30, manager.fibersAllocated());
1382 // Can go over maxFibersPoolSize, 10 of 30 fibers still active
1383 EXPECT_EQ(20, manager.fibersPoolSize());
1385 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1386 evb.loopOnce(); // no fibers active in this period
1387 EXPECT_EQ(30, manager.fibersAllocated());
1388 EXPECT_EQ(20, manager.fibersPoolSize());
1390 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1391 evb.loopOnce(); // should shrink fibers pool to maxFibersPoolSize
1392 EXPECT_EQ(15, manager.fibersAllocated());
1393 EXPECT_EQ(5, manager.fibersPoolSize());
1395 for (size_t i = 0; i < batons.size(); ++i) {
1399 EXPECT_EQ(15, manager.fibersAllocated());
1400 EXPECT_EQ(15, manager.fibersPoolSize());
1402 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1403 evb.loopOnce(); // 10 fibers active in last period
1404 EXPECT_EQ(10, manager.fibersAllocated());
1405 EXPECT_EQ(10, manager.fibersPoolSize());
1407 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1409 EXPECT_EQ(5, manager.fibersAllocated());
1410 EXPECT_EQ(5, manager.fibersPoolSize());
1413 TEST(FiberManager, batonWaitTimeoutHandler) {
1414 FiberManager manager(folly::make_unique<EventBaseLoopController>());
1416 folly::EventBase evb;
1417 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1418 .attachEventBase(evb);
1420 size_t fibersRun = 0;
1422 Baton::TimeoutHandler timeoutHandler;
1424 manager.addTask([&]() {
1425 baton.wait(timeoutHandler);
1428 manager.loopUntilNoReady();
1430 EXPECT_FALSE(baton.try_wait());
1431 EXPECT_EQ(0, fibersRun);
1433 timeoutHandler.scheduleTimeout(std::chrono::milliseconds(250));
1434 std::this_thread::sleep_for(std::chrono::milliseconds(500));
1436 EXPECT_FALSE(baton.try_wait());
1437 EXPECT_EQ(0, fibersRun);
1440 manager.loopUntilNoReady();
1442 EXPECT_EQ(1, fibersRun);
1445 TEST(FiberManager, batonWaitTimeoutMany) {
1446 FiberManager manager(folly::make_unique<EventBaseLoopController>());
1448 folly::EventBase evb;
1449 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1450 .attachEventBase(evb);
1452 constexpr size_t kNumTimeoutTasks = 10000;
1453 size_t tasksCount = kNumTimeoutTasks;
1455 // We add many tasks to hit timeout queue deallocation logic.
1456 for (size_t i = 0; i < kNumTimeoutTasks; ++i) {
1457 manager.addTask([&]() {
1459 Baton::TimeoutHandler timeoutHandler;
1461 folly::fibers::addTask([&] {
1462 timeoutHandler.scheduleTimeout(std::chrono::milliseconds(1000));
1465 baton.wait(timeoutHandler);
1466 if (--tasksCount == 0) {
1467 evb.terminateLoopSoon();
1475 TEST(FiberManager, remoteFutureTest) {
1476 FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
1477 auto& loopController =
1478 dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1482 auto f1 = fiberManager.addTaskFuture([&]() { return testValue1; });
1483 auto f2 = fiberManager.addTaskRemoteFuture([&]() { return testValue2; });
1484 loopController.loop([&]() { loopController.stop(); });
1488 EXPECT_EQ(v1, testValue1);
1489 EXPECT_EQ(v2, testValue2);
1492 // Test that a void function produes a Future<Unit>.
1493 TEST(FiberManager, remoteFutureVoidUnitTest) {
1494 FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
1495 auto& loopController =
1496 dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1498 bool ranLocal = false;
1499 folly::Future<folly::Unit> futureLocal =
1500 fiberManager.addTaskFuture([&]() { ranLocal = true; });
1502 bool ranRemote = false;
1503 folly::Future<folly::Unit> futureRemote =
1504 fiberManager.addTaskRemoteFuture([&]() { ranRemote = true; });
1506 loopController.loop([&]() { loopController.stop(); });
1509 ASSERT_TRUE(ranLocal);
1511 futureRemote.wait();
1512 ASSERT_TRUE(ranRemote);
1515 TEST(FiberManager, nestedFiberManagers) {
1516 folly::EventBase outerEvb;
1517 folly::EventBase innerEvb;
1519 getFiberManager(outerEvb).addTask([&]() {
1521 &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1523 runInMainContext([&]() {
1524 getFiberManager(innerEvb).addTask([&]() {
1526 &getFiberManager(innerEvb), FiberManager::getFiberManagerUnsafe());
1528 innerEvb.terminateLoopSoon();
1531 innerEvb.loopForever();
1535 &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1537 outerEvb.terminateLoopSoon();
1540 outerEvb.loopForever();