2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <folly/Memory.h>
21 #include <folly/futures/Future.h>
23 #include <folly/Conv.h>
24 #include <folly/fibers/AddTasks.h>
25 #include <folly/fibers/BatchDispatcher.h>
26 #include <folly/fibers/EventBaseLoopController.h>
27 #include <folly/fibers/FiberManager.h>
28 #include <folly/fibers/FiberManagerMap.h>
29 #include <folly/fibers/GenericBaton.h>
30 #include <folly/fibers/Semaphore.h>
31 #include <folly/fibers/SimpleLoopController.h>
32 #include <folly/fibers/WhenN.h>
33 #include <folly/portability/GTest.h>
35 using namespace folly::fibers;
39 TEST(FiberManager, batonTimedWaitTimeout) {
40 bool taskAdded = false;
41 size_t iterations = 0;
43 FiberManager manager(folly::make_unique<SimpleLoopController>());
44 auto& loopController =
45 dynamic_cast<SimpleLoopController&>(manager.loopController());
47 auto loopFunc = [&]() {
49 manager.addTask([&]() {
52 auto res = baton.timed_wait(std::chrono::milliseconds(230));
55 EXPECT_EQ(5, iterations);
57 loopController.stop();
59 manager.addTask([&]() {
62 auto res = baton.timed_wait(std::chrono::milliseconds(130));
65 EXPECT_EQ(3, iterations);
67 loopController.stop();
71 std::this_thread::sleep_for(std::chrono::milliseconds(50));
76 loopController.loop(std::move(loopFunc));
79 TEST(FiberManager, batonTimedWaitPost) {
80 bool taskAdded = false;
81 size_t iterations = 0;
84 FiberManager manager(folly::make_unique<SimpleLoopController>());
85 auto& loopController =
86 dynamic_cast<SimpleLoopController&>(manager.loopController());
88 auto loopFunc = [&]() {
90 manager.addTask([&]() {
94 auto res = baton.timed_wait(std::chrono::milliseconds(130));
97 EXPECT_EQ(2, iterations);
99 loopController.stop();
103 std::this_thread::sleep_for(std::chrono::milliseconds(50));
105 if (iterations == 2) {
111 loopController.loop(std::move(loopFunc));
114 TEST(FiberManager, batonTimedWaitTimeoutEvb) {
115 size_t tasksComplete = 0;
117 folly::EventBase evb;
119 FiberManager manager(folly::make_unique<EventBaseLoopController>());
120 dynamic_cast<EventBaseLoopController&>(manager.loopController())
121 .attachEventBase(evb);
123 auto task = [&](size_t timeout_ms) {
126 auto start = EventBaseLoopController::Clock::now();
127 auto res = baton.timed_wait(std::chrono::milliseconds(timeout_ms));
128 auto finish = EventBaseLoopController::Clock::now();
133 std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
135 EXPECT_GT(duration_ms.count(), timeout_ms - 50);
136 EXPECT_LT(duration_ms.count(), timeout_ms + 50);
138 if (++tasksComplete == 2) {
139 evb.terminateLoopSoon();
143 evb.runInEventBaseThread([&]() {
144 manager.addTask([&]() { task(500); });
145 manager.addTask([&]() { task(250); });
150 EXPECT_EQ(2, tasksComplete);
153 TEST(FiberManager, batonTimedWaitPostEvb) {
154 size_t tasksComplete = 0;
156 folly::EventBase evb;
158 FiberManager manager(folly::make_unique<EventBaseLoopController>());
159 dynamic_cast<EventBaseLoopController&>(manager.loopController())
160 .attachEventBase(evb);
162 evb.runInEventBaseThread([&]() {
163 manager.addTask([&]() {
166 evb.tryRunAfterDelay([&]() { baton.post(); }, 100);
168 auto start = EventBaseLoopController::Clock::now();
169 auto res = baton.timed_wait(std::chrono::milliseconds(130));
170 auto finish = EventBaseLoopController::Clock::now();
175 std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
177 EXPECT_TRUE(duration_ms.count() > 95 && duration_ms.count() < 110);
179 if (++tasksComplete == 1) {
180 evb.terminateLoopSoon();
187 EXPECT_EQ(1, tasksComplete);
190 TEST(FiberManager, batonTryWait) {
191 FiberManager manager(folly::make_unique<SimpleLoopController>());
193 // Check if try_wait and post work as expected
196 manager.addTask([&]() {
197 while (!b.try_wait()) {
200 auto thr = std::thread([&]() {
201 std::this_thread::sleep_for(std::chrono::milliseconds(300));
205 manager.loopUntilNoReady();
210 // Check try_wait without post
211 manager.addTask([&]() {
213 while (cnt && !c.try_wait()) {
216 EXPECT_TRUE(!c.try_wait()); // must still hold
220 manager.loopUntilNoReady();
223 TEST(FiberManager, genericBatonFiberWait) {
224 FiberManager manager(folly::make_unique<SimpleLoopController>());
227 bool fiberRunning = false;
229 manager.addTask([&]() {
230 EXPECT_EQ(manager.hasActiveFiber(), true);
233 fiberRunning = false;
236 EXPECT_FALSE(fiberRunning);
237 manager.loopUntilNoReady();
238 EXPECT_TRUE(fiberRunning); // ensure fiber still active
240 auto thr = std::thread([&]() {
241 std::this_thread::sleep_for(std::chrono::milliseconds(300));
245 while (fiberRunning) {
246 manager.loopUntilNoReady();
252 TEST(FiberManager, genericBatonThreadWait) {
253 FiberManager manager(folly::make_unique<SimpleLoopController>());
255 std::atomic<bool> threadWaiting(false);
257 auto thr = std::thread([&]() {
258 threadWaiting = true;
260 threadWaiting = false;
263 while (!threadWaiting) {
265 std::this_thread::sleep_for(std::chrono::milliseconds(300));
267 manager.addTask([&]() {
268 EXPECT_EQ(manager.hasActiveFiber(), true);
269 EXPECT_TRUE(threadWaiting);
271 while (threadWaiting) {
275 manager.loopUntilNoReady();
279 TEST(FiberManager, addTasksNoncopyable) {
280 std::vector<Promise<int>> pendingFibers;
281 bool taskAdded = false;
283 FiberManager manager(folly::make_unique<SimpleLoopController>());
284 auto& loopController =
285 dynamic_cast<SimpleLoopController&>(manager.loopController());
287 auto loopFunc = [&]() {
289 manager.addTask([&]() {
290 std::vector<std::function<std::unique_ptr<int>()>> funcs;
291 for (size_t i = 0; i < 3; ++i) {
292 funcs.push_back([i, &pendingFibers]() {
293 await([&pendingFibers](Promise<int> promise) {
294 pendingFibers.push_back(std::move(promise));
296 return folly::make_unique<int>(i * 2 + 1);
300 auto iter = addTasks(funcs.begin(), funcs.end());
303 while (iter.hasNext()) {
304 auto result = iter.awaitNext();
305 EXPECT_EQ(2 * iter.getTaskID() + 1, *result);
306 EXPECT_GE(2 - n, pendingFibers.size());
312 } else if (pendingFibers.size()) {
313 pendingFibers.back().setValue(0);
314 pendingFibers.pop_back();
316 loopController.stop();
320 loopController.loop(std::move(loopFunc));
323 TEST(FiberManager, awaitThrow) {
324 folly::EventBase evb;
325 struct ExpectedException {};
329 await([](Promise<int> p) {
331 throw ExpectedException();
337 await([&](Promise<int> p) {
338 evb.runInEventBaseThread([p = std::move(p)]() mutable {
341 throw ExpectedException();
348 TEST(FiberManager, addTasksThrow) {
349 std::vector<Promise<int>> pendingFibers;
350 bool taskAdded = false;
352 FiberManager manager(folly::make_unique<SimpleLoopController>());
353 auto& loopController =
354 dynamic_cast<SimpleLoopController&>(manager.loopController());
356 auto loopFunc = [&]() {
358 manager.addTask([&]() {
359 std::vector<std::function<int()>> funcs;
360 for (size_t i = 0; i < 3; ++i) {
361 funcs.push_back([i, &pendingFibers]() {
362 await([&pendingFibers](Promise<int> promise) {
363 pendingFibers.push_back(std::move(promise));
366 throw std::runtime_error("Runtime");
372 auto iter = addTasks(funcs.begin(), funcs.end());
375 while (iter.hasNext()) {
377 int result = iter.awaitNext();
378 EXPECT_EQ(1, iter.getTaskID() % 2);
379 EXPECT_EQ(2 * iter.getTaskID() + 1, result);
381 EXPECT_EQ(0, iter.getTaskID() % 2);
383 EXPECT_GE(2 - n, pendingFibers.size());
389 } else if (pendingFibers.size()) {
390 pendingFibers.back().setValue(0);
391 pendingFibers.pop_back();
393 loopController.stop();
397 loopController.loop(std::move(loopFunc));
400 TEST(FiberManager, addTasksVoid) {
401 std::vector<Promise<int>> pendingFibers;
402 bool taskAdded = false;
404 FiberManager manager(folly::make_unique<SimpleLoopController>());
405 auto& loopController =
406 dynamic_cast<SimpleLoopController&>(manager.loopController());
408 auto loopFunc = [&]() {
410 manager.addTask([&]() {
411 std::vector<std::function<void()>> funcs;
412 for (size_t i = 0; i < 3; ++i) {
413 funcs.push_back([i, &pendingFibers]() {
414 await([&pendingFibers](Promise<int> promise) {
415 pendingFibers.push_back(std::move(promise));
420 auto iter = addTasks(funcs.begin(), funcs.end());
423 while (iter.hasNext()) {
425 EXPECT_GE(2 - n, pendingFibers.size());
431 } else if (pendingFibers.size()) {
432 pendingFibers.back().setValue(0);
433 pendingFibers.pop_back();
435 loopController.stop();
439 loopController.loop(std::move(loopFunc));
442 TEST(FiberManager, addTasksVoidThrow) {
443 std::vector<Promise<int>> pendingFibers;
444 bool taskAdded = false;
446 FiberManager manager(folly::make_unique<SimpleLoopController>());
447 auto& loopController =
448 dynamic_cast<SimpleLoopController&>(manager.loopController());
450 auto loopFunc = [&]() {
452 manager.addTask([&]() {
453 std::vector<std::function<void()>> funcs;
454 for (size_t i = 0; i < 3; ++i) {
455 funcs.push_back([i, &pendingFibers]() {
456 await([&pendingFibers](Promise<int> promise) {
457 pendingFibers.push_back(std::move(promise));
460 throw std::runtime_error("");
465 auto iter = addTasks(funcs.begin(), funcs.end());
468 while (iter.hasNext()) {
471 EXPECT_EQ(1, iter.getTaskID() % 2);
473 EXPECT_EQ(0, iter.getTaskID() % 2);
475 EXPECT_GE(2 - n, pendingFibers.size());
481 } else if (pendingFibers.size()) {
482 pendingFibers.back().setValue(0);
483 pendingFibers.pop_back();
485 loopController.stop();
489 loopController.loop(std::move(loopFunc));
492 TEST(FiberManager, addTasksReserve) {
493 std::vector<Promise<int>> pendingFibers;
494 bool taskAdded = false;
496 FiberManager manager(folly::make_unique<SimpleLoopController>());
497 auto& loopController =
498 dynamic_cast<SimpleLoopController&>(manager.loopController());
500 auto loopFunc = [&]() {
502 manager.addTask([&]() {
503 std::vector<std::function<void()>> funcs;
504 for (size_t i = 0; i < 3; ++i) {
505 funcs.push_back([&pendingFibers]() {
506 await([&pendingFibers](Promise<int> promise) {
507 pendingFibers.push_back(std::move(promise));
512 auto iter = addTasks(funcs.begin(), funcs.end());
515 EXPECT_TRUE(iter.hasCompleted());
516 EXPECT_TRUE(iter.hasPending());
517 EXPECT_TRUE(iter.hasNext());
520 EXPECT_TRUE(iter.hasCompleted());
521 EXPECT_TRUE(iter.hasPending());
522 EXPECT_TRUE(iter.hasNext());
525 EXPECT_FALSE(iter.hasCompleted());
526 EXPECT_TRUE(iter.hasPending());
527 EXPECT_TRUE(iter.hasNext());
530 EXPECT_FALSE(iter.hasCompleted());
531 EXPECT_FALSE(iter.hasPending());
532 EXPECT_FALSE(iter.hasNext());
535 } else if (pendingFibers.size()) {
536 pendingFibers.back().setValue(0);
537 pendingFibers.pop_back();
539 loopController.stop();
543 loopController.loop(std::move(loopFunc));
546 TEST(FiberManager, addTaskDynamic) {
547 folly::EventBase evb;
551 auto makeTask = [&](size_t taskId) {
552 return [&, taskId]() -> size_t {
553 batons[taskId].wait();
559 .addTaskFuture([&]() {
560 TaskIterator<size_t> iterator;
562 iterator.addTask(makeTask(0));
563 iterator.addTask(makeTask(1));
567 EXPECT_EQ(1, iterator.awaitNext());
569 iterator.addTask(makeTask(2));
573 EXPECT_EQ(2, iterator.awaitNext());
577 EXPECT_EQ(0, iterator.awaitNext());
582 TEST(FiberManager, forEach) {
583 std::vector<Promise<int>> pendingFibers;
584 bool taskAdded = false;
586 FiberManager manager(folly::make_unique<SimpleLoopController>());
587 auto& loopController =
588 dynamic_cast<SimpleLoopController&>(manager.loopController());
590 auto loopFunc = [&]() {
592 manager.addTask([&]() {
593 std::vector<std::function<int()>> funcs;
594 for (size_t i = 0; i < 3; ++i) {
595 funcs.push_back([i, &pendingFibers]() {
596 await([&pendingFibers](Promise<int> promise) {
597 pendingFibers.push_back(std::move(promise));
603 std::vector<std::pair<size_t, int>> results;
604 forEach(funcs.begin(), funcs.end(), [&results](size_t id, int result) {
605 results.emplace_back(id, result);
607 EXPECT_EQ(3, results.size());
608 EXPECT_TRUE(pendingFibers.empty());
609 for (size_t i = 0; i < 3; ++i) {
610 EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
614 } else if (pendingFibers.size()) {
615 pendingFibers.back().setValue(0);
616 pendingFibers.pop_back();
618 loopController.stop();
622 loopController.loop(std::move(loopFunc));
625 TEST(FiberManager, collectN) {
626 std::vector<Promise<int>> pendingFibers;
627 bool taskAdded = false;
629 FiberManager manager(folly::make_unique<SimpleLoopController>());
630 auto& loopController =
631 dynamic_cast<SimpleLoopController&>(manager.loopController());
633 auto loopFunc = [&]() {
635 manager.addTask([&]() {
636 std::vector<std::function<int()>> funcs;
637 for (size_t i = 0; i < 3; ++i) {
638 funcs.push_back([i, &pendingFibers]() {
639 await([&pendingFibers](Promise<int> promise) {
640 pendingFibers.push_back(std::move(promise));
646 auto results = collectN(funcs.begin(), funcs.end(), 2);
647 EXPECT_EQ(2, results.size());
648 EXPECT_EQ(1, pendingFibers.size());
649 for (size_t i = 0; i < 2; ++i) {
650 EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
654 } else if (pendingFibers.size()) {
655 pendingFibers.back().setValue(0);
656 pendingFibers.pop_back();
658 loopController.stop();
662 loopController.loop(std::move(loopFunc));
665 TEST(FiberManager, collectNThrow) {
666 std::vector<Promise<int>> pendingFibers;
667 bool taskAdded = false;
669 FiberManager manager(folly::make_unique<SimpleLoopController>());
670 auto& loopController =
671 dynamic_cast<SimpleLoopController&>(manager.loopController());
673 auto loopFunc = [&]() {
675 manager.addTask([&]() {
676 std::vector<std::function<int()>> funcs;
677 for (size_t i = 0; i < 3; ++i) {
678 funcs.push_back([i, &pendingFibers]() {
679 await([&pendingFibers](Promise<int> promise) {
680 pendingFibers.push_back(std::move(promise));
682 throw std::runtime_error("Runtime");
688 collectN(funcs.begin(), funcs.end(), 2);
690 EXPECT_EQ(1, pendingFibers.size());
694 } else if (pendingFibers.size()) {
695 pendingFibers.back().setValue(0);
696 pendingFibers.pop_back();
698 loopController.stop();
702 loopController.loop(std::move(loopFunc));
705 TEST(FiberManager, collectNVoid) {
706 std::vector<Promise<int>> pendingFibers;
707 bool taskAdded = false;
709 FiberManager manager(folly::make_unique<SimpleLoopController>());
710 auto& loopController =
711 dynamic_cast<SimpleLoopController&>(manager.loopController());
713 auto loopFunc = [&]() {
715 manager.addTask([&]() {
716 std::vector<std::function<void()>> funcs;
717 for (size_t i = 0; i < 3; ++i) {
718 funcs.push_back([i, &pendingFibers]() {
719 await([&pendingFibers](Promise<int> promise) {
720 pendingFibers.push_back(std::move(promise));
725 auto results = collectN(funcs.begin(), funcs.end(), 2);
726 EXPECT_EQ(2, results.size());
727 EXPECT_EQ(1, pendingFibers.size());
730 } else if (pendingFibers.size()) {
731 pendingFibers.back().setValue(0);
732 pendingFibers.pop_back();
734 loopController.stop();
738 loopController.loop(std::move(loopFunc));
741 TEST(FiberManager, collectNVoidThrow) {
742 std::vector<Promise<int>> pendingFibers;
743 bool taskAdded = false;
745 FiberManager manager(folly::make_unique<SimpleLoopController>());
746 auto& loopController =
747 dynamic_cast<SimpleLoopController&>(manager.loopController());
749 auto loopFunc = [&]() {
751 manager.addTask([&]() {
752 std::vector<std::function<void()>> funcs;
753 for (size_t i = 0; i < 3; ++i) {
754 funcs.push_back([i, &pendingFibers]() {
755 await([&pendingFibers](Promise<int> promise) {
756 pendingFibers.push_back(std::move(promise));
758 throw std::runtime_error("Runtime");
763 collectN(funcs.begin(), funcs.end(), 2);
765 EXPECT_EQ(1, pendingFibers.size());
769 } else if (pendingFibers.size()) {
770 pendingFibers.back().setValue(0);
771 pendingFibers.pop_back();
773 loopController.stop();
777 loopController.loop(std::move(loopFunc));
780 TEST(FiberManager, collectAll) {
781 std::vector<Promise<int>> pendingFibers;
782 bool taskAdded = false;
784 FiberManager manager(folly::make_unique<SimpleLoopController>());
785 auto& loopController =
786 dynamic_cast<SimpleLoopController&>(manager.loopController());
788 auto loopFunc = [&]() {
790 manager.addTask([&]() {
791 std::vector<std::function<int()>> funcs;
792 for (size_t i = 0; i < 3; ++i) {
793 funcs.push_back([i, &pendingFibers]() {
794 await([&pendingFibers](Promise<int> promise) {
795 pendingFibers.push_back(std::move(promise));
801 auto results = collectAll(funcs.begin(), funcs.end());
802 EXPECT_TRUE(pendingFibers.empty());
803 for (size_t i = 0; i < 3; ++i) {
804 EXPECT_EQ(i * 2 + 1, results[i]);
808 } else if (pendingFibers.size()) {
809 pendingFibers.back().setValue(0);
810 pendingFibers.pop_back();
812 loopController.stop();
816 loopController.loop(std::move(loopFunc));
819 TEST(FiberManager, collectAllVoid) {
820 std::vector<Promise<int>> pendingFibers;
821 bool taskAdded = false;
823 FiberManager manager(folly::make_unique<SimpleLoopController>());
824 auto& loopController =
825 dynamic_cast<SimpleLoopController&>(manager.loopController());
827 auto loopFunc = [&]() {
829 manager.addTask([&]() {
830 std::vector<std::function<void()>> funcs;
831 for (size_t i = 0; i < 3; ++i) {
832 funcs.push_back([i, &pendingFibers]() {
833 await([&pendingFibers](Promise<int> promise) {
834 pendingFibers.push_back(std::move(promise));
839 collectAll(funcs.begin(), funcs.end());
840 EXPECT_TRUE(pendingFibers.empty());
843 } else if (pendingFibers.size()) {
844 pendingFibers.back().setValue(0);
845 pendingFibers.pop_back();
847 loopController.stop();
851 loopController.loop(std::move(loopFunc));
854 TEST(FiberManager, collectAny) {
855 std::vector<Promise<int>> pendingFibers;
856 bool taskAdded = false;
858 FiberManager manager(folly::make_unique<SimpleLoopController>());
859 auto& loopController =
860 dynamic_cast<SimpleLoopController&>(manager.loopController());
862 auto loopFunc = [&]() {
864 manager.addTask([&]() {
865 std::vector<std::function<int()>> funcs;
866 for (size_t i = 0; i < 3; ++i) {
867 funcs.push_back([i, &pendingFibers]() {
868 await([&pendingFibers](Promise<int> promise) {
869 pendingFibers.push_back(std::move(promise));
872 throw std::runtime_error("This exception will be ignored");
878 auto result = collectAny(funcs.begin(), funcs.end());
879 EXPECT_EQ(2, pendingFibers.size());
880 EXPECT_EQ(2, result.first);
881 EXPECT_EQ(2 * 2 + 1, result.second);
884 } else if (pendingFibers.size()) {
885 pendingFibers.back().setValue(0);
886 pendingFibers.pop_back();
888 loopController.stop();
892 loopController.loop(std::move(loopFunc));
896 /* Checks that this function was run from a main context,
897 by comparing an address on a stack to a known main stack address
898 and a known related fiber stack address. The assumption
899 is that fiber stack and main stack will be far enough apart,
900 while any two values on the same stack will be close. */
901 void expectMainContext(bool& ran, int* mainLocation, int* fiberLocation) {
903 /* 2 pages is a good guess */
904 constexpr ssize_t DISTANCE = 0x2000 / sizeof(int);
906 EXPECT_TRUE(std::abs(&here - fiberLocation) > DISTANCE);
909 EXPECT_TRUE(std::abs(&here - mainLocation) < DISTANCE);
917 TEST(FiberManager, runInMainContext) {
918 FiberManager manager(folly::make_unique<SimpleLoopController>());
919 auto& loopController =
920 dynamic_cast<SimpleLoopController&>(manager.loopController());
922 bool checkRan = false;
925 manager.runInMainContext(
926 [&]() { expectMainContext(checkRan, &mainLocation, nullptr); });
927 EXPECT_TRUE(checkRan);
931 manager.addTask([&]() {
933 explicit A(int value_) : value(value_) {}
934 A(const A&) = delete;
940 auto ret = runInMainContext([&]() {
941 expectMainContext(checkRan, &mainLocation, &stackLocation);
944 EXPECT_TRUE(checkRan);
945 EXPECT_EQ(42, ret.value);
948 loopController.loop([&]() { loopController.stop(); });
950 EXPECT_TRUE(checkRan);
953 TEST(FiberManager, addTaskFinally) {
954 FiberManager manager(folly::make_unique<SimpleLoopController>());
955 auto& loopController =
956 dynamic_cast<SimpleLoopController&>(manager.loopController());
958 bool checkRan = false;
962 manager.addTaskFinally(
963 [&]() { return 1234; },
964 [&](Try<int>&& result) {
965 EXPECT_EQ(result.value(), 1234);
967 expectMainContext(checkRan, &mainLocation, nullptr);
970 EXPECT_FALSE(checkRan);
972 loopController.loop([&]() { loopController.stop(); });
974 EXPECT_TRUE(checkRan);
977 TEST(FiberManager, fibersPoolWithinLimit) {
978 FiberManager::Options opts;
979 opts.maxFibersPoolSize = 5;
981 FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
982 auto& loopController =
983 dynamic_cast<SimpleLoopController&>(manager.loopController());
985 size_t fibersRun = 0;
987 for (size_t i = 0; i < 5; ++i) {
988 manager.addTask([&]() { ++fibersRun; });
990 loopController.loop([&]() { loopController.stop(); });
992 EXPECT_EQ(5, fibersRun);
993 EXPECT_EQ(5, manager.fibersAllocated());
994 EXPECT_EQ(5, manager.fibersPoolSize());
996 for (size_t i = 0; i < 5; ++i) {
997 manager.addTask([&]() { ++fibersRun; });
999 loopController.loop([&]() { loopController.stop(); });
1001 EXPECT_EQ(10, fibersRun);
1002 EXPECT_EQ(5, manager.fibersAllocated());
1003 EXPECT_EQ(5, manager.fibersPoolSize());
1006 TEST(FiberManager, fibersPoolOverLimit) {
1007 FiberManager::Options opts;
1008 opts.maxFibersPoolSize = 5;
1010 FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
1011 auto& loopController =
1012 dynamic_cast<SimpleLoopController&>(manager.loopController());
1014 size_t fibersRun = 0;
1016 for (size_t i = 0; i < 10; ++i) {
1017 manager.addTask([&]() { ++fibersRun; });
1020 EXPECT_EQ(0, fibersRun);
1021 EXPECT_EQ(10, manager.fibersAllocated());
1022 EXPECT_EQ(0, manager.fibersPoolSize());
1024 loopController.loop([&]() { loopController.stop(); });
1026 EXPECT_EQ(10, fibersRun);
1027 EXPECT_EQ(5, manager.fibersAllocated());
1028 EXPECT_EQ(5, manager.fibersPoolSize());
1031 TEST(FiberManager, remoteFiberBasic) {
1032 FiberManager manager(folly::make_unique<SimpleLoopController>());
1033 auto& loopController =
1034 dynamic_cast<SimpleLoopController&>(manager.loopController());
1037 result[0] = result[1] = 0;
1038 folly::Optional<Promise<int>> savedPromise[2];
1039 manager.addTask([&]() {
1041 [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1043 manager.addTask([&]() {
1045 [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1048 manager.loopUntilNoReady();
1050 EXPECT_TRUE(savedPromise[0].hasValue());
1051 EXPECT_TRUE(savedPromise[1].hasValue());
1052 EXPECT_EQ(0, result[0]);
1053 EXPECT_EQ(0, result[1]);
1055 std::thread remoteThread0{[&]() { savedPromise[0]->setValue(42); }};
1056 std::thread remoteThread1{[&]() { savedPromise[1]->setValue(43); }};
1057 remoteThread0.join();
1058 remoteThread1.join();
1059 EXPECT_EQ(0, result[0]);
1060 EXPECT_EQ(0, result[1]);
1061 /* Should only have scheduled once */
1062 EXPECT_EQ(1, loopController.remoteScheduleCalled());
1064 manager.loopUntilNoReady();
1065 EXPECT_EQ(42, result[0]);
1066 EXPECT_EQ(43, result[1]);
1069 TEST(FiberManager, addTaskRemoteBasic) {
1070 FiberManager manager(folly::make_unique<SimpleLoopController>());
1073 result[0] = result[1] = 0;
1074 folly::Optional<Promise<int>> savedPromise[2];
1076 std::thread remoteThread0{[&]() {
1077 manager.addTaskRemote([&]() {
1079 [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1082 std::thread remoteThread1{[&]() {
1083 manager.addTaskRemote([&]() {
1085 [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1088 remoteThread0.join();
1089 remoteThread1.join();
1091 manager.loopUntilNoReady();
1093 EXPECT_TRUE(savedPromise[0].hasValue());
1094 EXPECT_TRUE(savedPromise[1].hasValue());
1095 EXPECT_EQ(0, result[0]);
1096 EXPECT_EQ(0, result[1]);
1098 savedPromise[0]->setValue(42);
1099 savedPromise[1]->setValue(43);
1101 EXPECT_EQ(0, result[0]);
1102 EXPECT_EQ(0, result[1]);
1104 manager.loopUntilNoReady();
1105 EXPECT_EQ(42, result[0]);
1106 EXPECT_EQ(43, result[1]);
1109 TEST(FiberManager, remoteHasTasks) {
1111 FiberManager fm(folly::make_unique<SimpleLoopController>());
1112 std::thread remote([&]() { fm.addTaskRemote([&]() { ++counter; }); });
1116 while (fm.hasTasks()) {
1117 fm.loopUntilNoReady();
1120 EXPECT_FALSE(fm.hasTasks());
1121 EXPECT_EQ(counter, 1);
1124 TEST(FiberManager, remoteHasReadyTasks) {
1126 folly::Optional<Promise<int>> savedPromise;
1127 FiberManager fm(folly::make_unique<SimpleLoopController>());
1128 std::thread remote([&]() {
1129 fm.addTaskRemote([&]() {
1131 [&](Promise<int> promise) { savedPromise = std::move(promise); });
1132 EXPECT_TRUE(fm.hasTasks());
1137 EXPECT_TRUE(fm.hasTasks());
1139 fm.loopUntilNoReady();
1140 EXPECT_TRUE(fm.hasTasks());
1142 std::thread remote2([&]() { savedPromise->setValue(47); });
1144 EXPECT_TRUE(fm.hasTasks());
1146 fm.loopUntilNoReady();
1147 EXPECT_FALSE(fm.hasTasks());
1149 EXPECT_EQ(result, 47);
1152 template <typename Data>
1153 void testFiberLocal() {
1155 LocalType<Data>(), folly::make_unique<SimpleLoopController>());
1158 EXPECT_EQ(42, local<Data>().value);
1160 local<Data>().value = 43;
1163 EXPECT_EQ(43, local<Data>().value);
1165 local<Data>().value = 44;
1167 addTask([]() { EXPECT_EQ(44, local<Data>().value); });
1172 EXPECT_EQ(42, local<Data>().value);
1174 local<Data>().value = 43;
1176 fm.addTaskRemote([]() { EXPECT_EQ(43, local<Data>().value); });
1180 EXPECT_EQ(42, local<Data>().value);
1181 local<Data>().value = 43;
1184 EXPECT_EQ(43, local<Data>().value);
1185 local<Data>().value = 44;
1187 std::vector<std::function<void()>> tasks{task};
1188 collectAny(tasks.begin(), tasks.end());
1190 EXPECT_EQ(43, local<Data>().value);
1193 fm.loopUntilNoReady();
1194 EXPECT_FALSE(fm.hasTasks());
1197 TEST(FiberManager, fiberLocal) {
1202 testFiberLocal<SimpleData>();
1205 TEST(FiberManager, fiberLocalHeap) {
1207 char _[1024 * 1024];
1211 testFiberLocal<LargeData>();
1214 TEST(FiberManager, fiberLocalDestructor) {
1221 EXPECT_EQ(42, local<CrazyData>().data);
1222 // Make sure we don't have infinite loop
1223 local<CrazyData>().data = 0;
1230 LocalType<CrazyData>(), folly::make_unique<SimpleLoopController>());
1232 fm.addTask([]() { local<CrazyData>().data = 41; });
1234 fm.loopUntilNoReady();
1235 EXPECT_FALSE(fm.hasTasks());
1238 TEST(FiberManager, yieldTest) {
1239 FiberManager manager(folly::make_unique<SimpleLoopController>());
1240 auto& loopController =
1241 dynamic_cast<SimpleLoopController&>(manager.loopController());
1243 bool checkRan = false;
1245 manager.addTask([&]() {
1250 loopController.loop([&]() {
1252 loopController.stop();
1256 EXPECT_TRUE(checkRan);
1259 TEST(FiberManager, RequestContext) {
1260 FiberManager fm(folly::make_unique<SimpleLoopController>());
1262 bool checkRun1 = false;
1263 bool checkRun2 = false;
1264 bool checkRun3 = false;
1265 bool checkRun4 = false;
1266 folly::fibers::Baton baton1;
1267 folly::fibers::Baton baton2;
1268 folly::fibers::Baton baton3;
1269 folly::fibers::Baton baton4;
1272 folly::RequestContextScopeGuard rctx;
1273 auto rcontext1 = folly::RequestContext::get();
1274 fm.addTask([&, rcontext1]() {
1275 EXPECT_EQ(rcontext1, folly::RequestContext::get());
1277 [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1278 EXPECT_EQ(rcontext1, folly::RequestContext::get());
1280 [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1285 folly::RequestContextScopeGuard rctx;
1286 auto rcontext2 = folly::RequestContext::get();
1287 fm.addTaskRemote([&, rcontext2]() {
1288 EXPECT_EQ(rcontext2, folly::RequestContext::get());
1290 EXPECT_EQ(rcontext2, folly::RequestContext::get());
1295 folly::RequestContextScopeGuard rctx;
1296 auto rcontext3 = folly::RequestContext::get();
1299 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1301 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1303 return folly::Unit();
1305 [&, rcontext3](Try<folly::Unit>&& /* t */) {
1306 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1311 folly::RequestContext::setContext(nullptr);
1313 folly::RequestContextScopeGuard rctx;
1314 auto rcontext4 = folly::RequestContext::get();
1316 EXPECT_EQ(rcontext4, folly::RequestContext::get());
1321 folly::RequestContextScopeGuard rctx;
1322 auto rcontext = folly::RequestContext::get();
1324 fm.loopUntilNoReady();
1325 EXPECT_EQ(rcontext, folly::RequestContext::get());
1328 EXPECT_EQ(rcontext, folly::RequestContext::get());
1329 fm.loopUntilNoReady();
1330 EXPECT_TRUE(checkRun1);
1331 EXPECT_EQ(rcontext, folly::RequestContext::get());
1334 EXPECT_EQ(rcontext, folly::RequestContext::get());
1335 fm.loopUntilNoReady();
1336 EXPECT_TRUE(checkRun2);
1337 EXPECT_EQ(rcontext, folly::RequestContext::get());
1340 EXPECT_EQ(rcontext, folly::RequestContext::get());
1341 fm.loopUntilNoReady();
1342 EXPECT_TRUE(checkRun3);
1343 EXPECT_EQ(rcontext, folly::RequestContext::get());
1346 EXPECT_EQ(rcontext, folly::RequestContext::get());
1347 fm.loopUntilNoReady();
1348 EXPECT_TRUE(checkRun4);
1349 EXPECT_EQ(rcontext, folly::RequestContext::get());
1353 TEST(FiberManager, resizePeriodically) {
1354 FiberManager::Options opts;
1355 opts.fibersPoolResizePeriodMs = 300;
1356 opts.maxFibersPoolSize = 5;
1358 FiberManager manager(folly::make_unique<EventBaseLoopController>(), opts);
1360 folly::EventBase evb;
1361 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1362 .attachEventBase(evb);
1364 std::vector<Baton> batons(10);
1366 size_t tasksRun = 0;
1367 for (size_t i = 0; i < 30; ++i) {
1368 manager.addTask([i, &batons, &tasksRun]() {
1370 // Keep some fibers active indefinitely
1371 if (i < batons.size()) {
1377 EXPECT_EQ(0, tasksRun);
1378 EXPECT_EQ(30, manager.fibersAllocated());
1379 EXPECT_EQ(0, manager.fibersPoolSize());
1382 EXPECT_EQ(30, tasksRun);
1383 EXPECT_EQ(30, manager.fibersAllocated());
1384 // Can go over maxFibersPoolSize, 10 of 30 fibers still active
1385 EXPECT_EQ(20, manager.fibersPoolSize());
1387 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1388 evb.loopOnce(); // no fibers active in this period
1389 EXPECT_EQ(30, manager.fibersAllocated());
1390 EXPECT_EQ(20, manager.fibersPoolSize());
1392 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1393 evb.loopOnce(); // should shrink fibers pool to maxFibersPoolSize
1394 EXPECT_EQ(15, manager.fibersAllocated());
1395 EXPECT_EQ(5, manager.fibersPoolSize());
1397 for (size_t i = 0; i < batons.size(); ++i) {
1401 EXPECT_EQ(15, manager.fibersAllocated());
1402 EXPECT_EQ(15, manager.fibersPoolSize());
1404 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1405 evb.loopOnce(); // 10 fibers active in last period
1406 EXPECT_EQ(10, manager.fibersAllocated());
1407 EXPECT_EQ(10, manager.fibersPoolSize());
1409 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1411 EXPECT_EQ(5, manager.fibersAllocated());
1412 EXPECT_EQ(5, manager.fibersPoolSize());
1415 TEST(FiberManager, batonWaitTimeoutHandler) {
1416 FiberManager manager(folly::make_unique<EventBaseLoopController>());
1418 folly::EventBase evb;
1419 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1420 .attachEventBase(evb);
1422 size_t fibersRun = 0;
1424 Baton::TimeoutHandler timeoutHandler;
1426 manager.addTask([&]() {
1427 baton.wait(timeoutHandler);
1430 manager.loopUntilNoReady();
1432 EXPECT_FALSE(baton.try_wait());
1433 EXPECT_EQ(0, fibersRun);
1435 timeoutHandler.scheduleTimeout(std::chrono::milliseconds(250));
1436 std::this_thread::sleep_for(std::chrono::milliseconds(500));
1438 EXPECT_FALSE(baton.try_wait());
1439 EXPECT_EQ(0, fibersRun);
1442 manager.loopUntilNoReady();
1444 EXPECT_EQ(1, fibersRun);
1447 TEST(FiberManager, batonWaitTimeoutMany) {
1448 FiberManager manager(folly::make_unique<EventBaseLoopController>());
1450 folly::EventBase evb;
1451 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1452 .attachEventBase(evb);
1454 constexpr size_t kNumTimeoutTasks = 10000;
1455 size_t tasksCount = kNumTimeoutTasks;
1457 // We add many tasks to hit timeout queue deallocation logic.
1458 for (size_t i = 0; i < kNumTimeoutTasks; ++i) {
1459 manager.addTask([&]() {
1461 Baton::TimeoutHandler timeoutHandler;
1463 folly::fibers::addTask([&] {
1464 timeoutHandler.scheduleTimeout(std::chrono::milliseconds(1000));
1467 baton.wait(timeoutHandler);
1468 if (--tasksCount == 0) {
1469 evb.terminateLoopSoon();
1477 TEST(FiberManager, remoteFutureTest) {
1478 FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
1479 auto& loopController =
1480 dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1484 auto f1 = fiberManager.addTaskFuture([&]() { return testValue1; });
1485 auto f2 = fiberManager.addTaskRemoteFuture([&]() { return testValue2; });
1486 loopController.loop([&]() { loopController.stop(); });
1490 EXPECT_EQ(v1, testValue1);
1491 EXPECT_EQ(v2, testValue2);
1494 // Test that a void function produes a Future<Unit>.
1495 TEST(FiberManager, remoteFutureVoidUnitTest) {
1496 FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
1497 auto& loopController =
1498 dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1500 bool ranLocal = false;
1501 folly::Future<folly::Unit> futureLocal =
1502 fiberManager.addTaskFuture([&]() { ranLocal = true; });
1504 bool ranRemote = false;
1505 folly::Future<folly::Unit> futureRemote =
1506 fiberManager.addTaskRemoteFuture([&]() { ranRemote = true; });
1508 loopController.loop([&]() { loopController.stop(); });
1511 ASSERT_TRUE(ranLocal);
1513 futureRemote.wait();
1514 ASSERT_TRUE(ranRemote);
1517 TEST(FiberManager, nestedFiberManagers) {
1518 folly::EventBase outerEvb;
1519 folly::EventBase innerEvb;
1521 getFiberManager(outerEvb).addTask([&]() {
1523 &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1525 runInMainContext([&]() {
1526 getFiberManager(innerEvb).addTask([&]() {
1528 &getFiberManager(innerEvb), FiberManager::getFiberManagerUnsafe());
1530 innerEvb.terminateLoopSoon();
1533 innerEvb.loopForever();
1537 &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1539 outerEvb.terminateLoopSoon();
1542 outerEvb.loopForever();
1545 TEST(FiberManager, semaphore) {
1546 constexpr size_t kTasks = 10;
1547 constexpr size_t kIterations = 10000;
1548 constexpr size_t kNumTokens = 10;
1550 Semaphore sem(kNumTokens);
1554 auto task = [&sem, kTasks, kIterations, kNumTokens](
1555 int& counter, folly::fibers::Baton& baton) {
1556 FiberManager manager(folly::make_unique<EventBaseLoopController>());
1557 folly::EventBase evb;
1558 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1559 .attachEventBase(evb);
1562 std::shared_ptr<folly::EventBase> completionCounter(
1563 &evb, [](folly::EventBase* evb) { evb->terminateLoopSoon(); });
1565 for (size_t i = 0; i < kTasks; ++i) {
1566 manager.addTask([&, completionCounter]() {
1567 for (size_t j = 0; j < kIterations; ++j) {
1573 EXPECT_LT(counter, kNumTokens);
1574 EXPECT_GE(counter, 0);
1584 folly::fibers::Baton batonA;
1585 folly::fibers::Baton batonB;
1586 std::thread threadA([&] { task(counterA, batonA); });
1587 std::thread threadB([&] { task(counterB, batonB); });
1594 EXPECT_LT(counterA, kNumTokens);
1595 EXPECT_LT(counterB, kNumTokens);
1596 EXPECT_GE(counterA, 0);
1597 EXPECT_GE(counterB, 0);
1600 template <typename ExecutorT>
1601 void singleBatchDispatch(ExecutorT& executor, int batchSize, int index) {
1602 thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
1603 executor, [=](std::vector<int>&& batch) {
1604 EXPECT_EQ(batchSize, batch.size());
1605 std::vector<std::string> results;
1606 for (auto& it : batch) {
1607 results.push_back(folly::to<std::string>(it));
1612 auto indexCopy = index;
1613 auto result = batchDispatcher.add(std::move(indexCopy));
1614 EXPECT_EQ(folly::to<std::string>(index), result.get());
1617 TEST(FiberManager, batchDispatchTest) {
1618 folly::EventBase evb;
1619 auto& executor = getFiberManager(evb);
1621 // Launch multiple fibers with a single id.
1622 executor.add([&]() {
1624 for (int i = 0; i < batchSize; i++) {
1626 [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
1631 // Reuse the same BatchDispatcher to batch once again.
1632 executor.add([&]() {
1634 for (int i = 0; i < batchSize; i++) {
1636 [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
1642 template <typename ExecutorT>
1643 folly::Future<std::vector<std::string>> doubleBatchInnerDispatch(
1644 ExecutorT& executor,
1645 int totalNumberOfElements,
1646 std::vector<int> input) {
1647 thread_local BatchDispatcher<
1649 std::vector<std::string>,
1651 batchDispatcher(executor, [=](std::vector<std::vector<int>>&& batch) {
1652 std::vector<std::vector<std::string>> results;
1653 int numberOfElements = 0;
1654 for (auto& unit : batch) {
1655 numberOfElements += unit.size();
1656 std::vector<std::string> result;
1657 for (auto& element : unit) {
1658 result.push_back(folly::to<std::string>(element));
1660 results.push_back(std::move(result));
1662 EXPECT_EQ(totalNumberOfElements, numberOfElements);
1666 return batchDispatcher.add(std::move(input));
1670 * Batch values in groups of 5, and then call inner dispatch.
1672 template <typename ExecutorT>
1673 void doubleBatchOuterDispatch(
1674 ExecutorT& executor,
1675 int totalNumberOfElements,
1677 thread_local BatchDispatcher<int, std::string, ExecutorT>
1678 batchDispatcher(executor, [=, &executor](std::vector<int>&& batch) {
1679 EXPECT_EQ(totalNumberOfElements, batch.size());
1680 std::vector<std::string> results;
1681 std::vector<folly::Future<std::vector<std::string>>>
1682 innerDispatchResultFutures;
1684 std::vector<int> group;
1685 for (auto unit : batch) {
1686 group.push_back(unit);
1687 if (group.size() == 5) {
1688 auto localGroup = group;
1691 innerDispatchResultFutures.push_back(doubleBatchInnerDispatch(
1692 executor, totalNumberOfElements, localGroup));
1697 innerDispatchResultFutures.begin(), innerDispatchResultFutures.end())
1699 std::vector<Try<std::vector<std::string>>> innerDispatchResults) {
1700 for (auto& unit : innerDispatchResults) {
1701 for (auto& element : unit.value()) {
1702 results.push_back(element);
1710 auto indexCopy = index;
1711 auto result = batchDispatcher.add(std::move(indexCopy));
1712 EXPECT_EQ(folly::to<std::string>(index), result.get());
1715 TEST(FiberManager, doubleBatchDispatchTest) {
1716 folly::EventBase evb;
1717 auto& executor = getFiberManager(evb);
1719 // Launch multiple fibers with a single id.
1720 executor.add([&]() {
1721 int totalNumberOfElements = 20;
1722 for (int i = 0; i < totalNumberOfElements; i++) {
1723 executor.add([=, &executor]() {
1724 doubleBatchOuterDispatch(executor, totalNumberOfElements, i);
1731 template <typename ExecutorT>
1732 void batchDispatchExceptionHandling(ExecutorT& executor, int i) {
1733 thread_local BatchDispatcher<int, int, ExecutorT> batchDispatcher(
1734 executor, [=, &executor](std::vector<int> &&) -> std::vector<int> {
1735 throw std::runtime_error("Surprise!!");
1738 EXPECT_THROW(batchDispatcher.add(i).get(), std::runtime_error);
1741 TEST(FiberManager, batchDispatchExceptionHandlingTest) {
1742 folly::EventBase evb;
1743 auto& executor = getFiberManager(evb);
1745 // Launch multiple fibers with a single id.
1746 executor.add([&]() {
1747 int totalNumberOfElements = 5;
1748 for (int i = 0; i < totalNumberOfElements; i++) {
1750 [=, &executor]() { batchDispatchExceptionHandling(executor, i); });
1757 * Test that we can properly track fiber stack usage.
1759 * This functionality can only be enabled when ASAN is disabled, so avoid
1760 * running this test with ASAN.
1762 #ifndef FOLLY_SANITIZE_ADDRESS
1763 TEST(FiberManager, recordStack) {
1765 folly::fibers::FiberManager::Options opts;
1766 opts.recordStackEvery = 1;
1768 FiberManager fm(folly::make_unique<SimpleLoopController>(), opts);
1769 auto& loopController =
1770 dynamic_cast<SimpleLoopController&>(fm.loopController());
1772 constexpr size_t n = 1000;
1776 for (size_t i = 0; i < n; ++i) {
1779 for (size_t i = 0; i + 1 < n; ++i) {
1780 s += b[i] * b[i + 1];
1786 loopController.loop([&]() { loopController.stop(); });
1788 // Check that we properly accounted fiber stack usage.
1789 EXPECT_LT(n * sizeof(int), fm.stackHighWatermark());