2 * Copyright 2016 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <folly/Memory.h>
21 #include <folly/Random.h>
22 #include <folly/futures/Future.h>
24 #include <folly/Conv.h>
25 #include <folly/fibers/AddTasks.h>
26 #include <folly/fibers/AtomicBatchDispatcher.h>
27 #include <folly/fibers/BatchDispatcher.h>
28 #include <folly/fibers/EventBaseLoopController.h>
29 #include <folly/fibers/FiberManager.h>
30 #include <folly/fibers/FiberManagerMap.h>
31 #include <folly/fibers/GenericBaton.h>
32 #include <folly/fibers/Semaphore.h>
33 #include <folly/fibers/SimpleLoopController.h>
34 #include <folly/fibers/WhenN.h>
35 #include <folly/io/async/ScopedEventBaseThread.h>
36 #include <folly/portability/GTest.h>
38 using namespace folly::fibers;
42 TEST(FiberManager, batonTimedWaitTimeout) {
43 bool taskAdded = false;
44 size_t iterations = 0;
46 FiberManager manager(folly::make_unique<SimpleLoopController>());
47 auto& loopController =
48 dynamic_cast<SimpleLoopController&>(manager.loopController());
50 auto loopFunc = [&]() {
52 manager.addTask([&]() {
55 auto res = baton.timed_wait(std::chrono::milliseconds(230));
58 EXPECT_EQ(5, iterations);
60 loopController.stop();
62 manager.addTask([&]() {
65 auto res = baton.timed_wait(std::chrono::milliseconds(130));
68 EXPECT_EQ(3, iterations);
70 loopController.stop();
74 std::this_thread::sleep_for(std::chrono::milliseconds(50));
79 loopController.loop(std::move(loopFunc));
82 TEST(FiberManager, batonTimedWaitPost) {
83 bool taskAdded = false;
84 size_t iterations = 0;
87 FiberManager manager(folly::make_unique<SimpleLoopController>());
88 auto& loopController =
89 dynamic_cast<SimpleLoopController&>(manager.loopController());
91 auto loopFunc = [&]() {
93 manager.addTask([&]() {
97 auto res = baton.timed_wait(std::chrono::milliseconds(130));
100 EXPECT_EQ(2, iterations);
102 loopController.stop();
106 std::this_thread::sleep_for(std::chrono::milliseconds(50));
108 if (iterations == 2) {
114 loopController.loop(std::move(loopFunc));
117 TEST(FiberManager, batonTimedWaitTimeoutEvb) {
118 size_t tasksComplete = 0;
120 folly::EventBase evb;
122 FiberManager manager(folly::make_unique<EventBaseLoopController>());
123 dynamic_cast<EventBaseLoopController&>(manager.loopController())
124 .attachEventBase(evb);
126 auto task = [&](size_t timeout_ms) {
129 auto start = EventBaseLoopController::Clock::now();
130 auto res = baton.timed_wait(std::chrono::milliseconds(timeout_ms));
131 auto finish = EventBaseLoopController::Clock::now();
136 std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
138 EXPECT_GT(duration_ms.count(), timeout_ms - 50);
139 EXPECT_LT(duration_ms.count(), timeout_ms + 50);
141 if (++tasksComplete == 2) {
142 evb.terminateLoopSoon();
146 evb.runInEventBaseThread([&]() {
147 manager.addTask([&]() { task(500); });
148 manager.addTask([&]() { task(250); });
153 EXPECT_EQ(2, tasksComplete);
156 TEST(FiberManager, batonTimedWaitPostEvb) {
157 size_t tasksComplete = 0;
159 folly::EventBase evb;
161 FiberManager manager(folly::make_unique<EventBaseLoopController>());
162 dynamic_cast<EventBaseLoopController&>(manager.loopController())
163 .attachEventBase(evb);
165 evb.runInEventBaseThread([&]() {
166 manager.addTask([&]() {
169 evb.tryRunAfterDelay([&]() { baton.post(); }, 100);
171 auto start = EventBaseLoopController::Clock::now();
172 auto res = baton.timed_wait(std::chrono::milliseconds(130));
173 auto finish = EventBaseLoopController::Clock::now();
178 std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
180 EXPECT_TRUE(duration_ms.count() > 95 && duration_ms.count() < 110);
182 if (++tasksComplete == 1) {
183 evb.terminateLoopSoon();
190 EXPECT_EQ(1, tasksComplete);
193 TEST(FiberManager, batonTryWait) {
194 FiberManager manager(folly::make_unique<SimpleLoopController>());
196 // Check if try_wait and post work as expected
199 manager.addTask([&]() {
200 while (!b.try_wait()) {
203 auto thr = std::thread([&]() {
204 std::this_thread::sleep_for(std::chrono::milliseconds(300));
208 manager.loopUntilNoReady();
213 // Check try_wait without post
214 manager.addTask([&]() {
216 while (cnt && !c.try_wait()) {
219 EXPECT_TRUE(!c.try_wait()); // must still hold
223 manager.loopUntilNoReady();
226 TEST(FiberManager, genericBatonFiberWait) {
227 FiberManager manager(folly::make_unique<SimpleLoopController>());
230 bool fiberRunning = false;
232 manager.addTask([&]() {
233 EXPECT_EQ(manager.hasActiveFiber(), true);
236 fiberRunning = false;
239 EXPECT_FALSE(fiberRunning);
240 manager.loopUntilNoReady();
241 EXPECT_TRUE(fiberRunning); // ensure fiber still active
243 auto thr = std::thread([&]() {
244 std::this_thread::sleep_for(std::chrono::milliseconds(300));
248 while (fiberRunning) {
249 manager.loopUntilNoReady();
255 TEST(FiberManager, genericBatonThreadWait) {
256 FiberManager manager(folly::make_unique<SimpleLoopController>());
258 std::atomic<bool> threadWaiting(false);
260 auto thr = std::thread([&]() {
261 threadWaiting = true;
263 threadWaiting = false;
266 while (!threadWaiting) {
268 std::this_thread::sleep_for(std::chrono::milliseconds(300));
270 manager.addTask([&]() {
271 EXPECT_EQ(manager.hasActiveFiber(), true);
272 EXPECT_TRUE(threadWaiting);
274 while (threadWaiting) {
278 manager.loopUntilNoReady();
282 TEST(FiberManager, addTasksNoncopyable) {
283 std::vector<Promise<int>> pendingFibers;
284 bool taskAdded = false;
286 FiberManager manager(folly::make_unique<SimpleLoopController>());
287 auto& loopController =
288 dynamic_cast<SimpleLoopController&>(manager.loopController());
290 auto loopFunc = [&]() {
292 manager.addTask([&]() {
293 std::vector<std::function<std::unique_ptr<int>()>> funcs;
294 for (size_t i = 0; i < 3; ++i) {
295 funcs.push_back([i, &pendingFibers]() {
296 await([&pendingFibers](Promise<int> promise) {
297 pendingFibers.push_back(std::move(promise));
299 return folly::make_unique<int>(i * 2 + 1);
303 auto iter = addTasks(funcs.begin(), funcs.end());
306 while (iter.hasNext()) {
307 auto result = iter.awaitNext();
308 EXPECT_EQ(2 * iter.getTaskID() + 1, *result);
309 EXPECT_GE(2 - n, pendingFibers.size());
315 } else if (pendingFibers.size()) {
316 pendingFibers.back().setValue(0);
317 pendingFibers.pop_back();
319 loopController.stop();
323 loopController.loop(std::move(loopFunc));
326 TEST(FiberManager, awaitThrow) {
327 folly::EventBase evb;
328 struct ExpectedException {};
332 await([](Promise<int> p) {
334 throw ExpectedException();
340 await([&](Promise<int> p) {
341 evb.runInEventBaseThread([p = std::move(p)]() mutable {
344 throw ExpectedException();
351 TEST(FiberManager, addTasksThrow) {
352 std::vector<Promise<int>> pendingFibers;
353 bool taskAdded = false;
355 FiberManager manager(folly::make_unique<SimpleLoopController>());
356 auto& loopController =
357 dynamic_cast<SimpleLoopController&>(manager.loopController());
359 auto loopFunc = [&]() {
361 manager.addTask([&]() {
362 std::vector<std::function<int()>> funcs;
363 for (size_t i = 0; i < 3; ++i) {
364 funcs.push_back([i, &pendingFibers]() {
365 await([&pendingFibers](Promise<int> promise) {
366 pendingFibers.push_back(std::move(promise));
369 throw std::runtime_error("Runtime");
375 auto iter = addTasks(funcs.begin(), funcs.end());
378 while (iter.hasNext()) {
380 int result = iter.awaitNext();
381 EXPECT_EQ(1, iter.getTaskID() % 2);
382 EXPECT_EQ(2 * iter.getTaskID() + 1, result);
384 EXPECT_EQ(0, iter.getTaskID() % 2);
386 EXPECT_GE(2 - n, pendingFibers.size());
392 } else if (pendingFibers.size()) {
393 pendingFibers.back().setValue(0);
394 pendingFibers.pop_back();
396 loopController.stop();
400 loopController.loop(std::move(loopFunc));
403 TEST(FiberManager, addTasksVoid) {
404 std::vector<Promise<int>> pendingFibers;
405 bool taskAdded = false;
407 FiberManager manager(folly::make_unique<SimpleLoopController>());
408 auto& loopController =
409 dynamic_cast<SimpleLoopController&>(manager.loopController());
411 auto loopFunc = [&]() {
413 manager.addTask([&]() {
414 std::vector<std::function<void()>> funcs;
415 for (size_t i = 0; i < 3; ++i) {
416 funcs.push_back([i, &pendingFibers]() {
417 await([&pendingFibers](Promise<int> promise) {
418 pendingFibers.push_back(std::move(promise));
423 auto iter = addTasks(funcs.begin(), funcs.end());
426 while (iter.hasNext()) {
428 EXPECT_GE(2 - n, pendingFibers.size());
434 } else if (pendingFibers.size()) {
435 pendingFibers.back().setValue(0);
436 pendingFibers.pop_back();
438 loopController.stop();
442 loopController.loop(std::move(loopFunc));
445 TEST(FiberManager, addTasksVoidThrow) {
446 std::vector<Promise<int>> pendingFibers;
447 bool taskAdded = false;
449 FiberManager manager(folly::make_unique<SimpleLoopController>());
450 auto& loopController =
451 dynamic_cast<SimpleLoopController&>(manager.loopController());
453 auto loopFunc = [&]() {
455 manager.addTask([&]() {
456 std::vector<std::function<void()>> funcs;
457 for (size_t i = 0; i < 3; ++i) {
458 funcs.push_back([i, &pendingFibers]() {
459 await([&pendingFibers](Promise<int> promise) {
460 pendingFibers.push_back(std::move(promise));
463 throw std::runtime_error("");
468 auto iter = addTasks(funcs.begin(), funcs.end());
471 while (iter.hasNext()) {
474 EXPECT_EQ(1, iter.getTaskID() % 2);
476 EXPECT_EQ(0, iter.getTaskID() % 2);
478 EXPECT_GE(2 - n, pendingFibers.size());
484 } else if (pendingFibers.size()) {
485 pendingFibers.back().setValue(0);
486 pendingFibers.pop_back();
488 loopController.stop();
492 loopController.loop(std::move(loopFunc));
495 TEST(FiberManager, addTasksReserve) {
496 std::vector<Promise<int>> pendingFibers;
497 bool taskAdded = false;
499 FiberManager manager(folly::make_unique<SimpleLoopController>());
500 auto& loopController =
501 dynamic_cast<SimpleLoopController&>(manager.loopController());
503 auto loopFunc = [&]() {
505 manager.addTask([&]() {
506 std::vector<std::function<void()>> funcs;
507 for (size_t i = 0; i < 3; ++i) {
508 funcs.push_back([&pendingFibers]() {
509 await([&pendingFibers](Promise<int> promise) {
510 pendingFibers.push_back(std::move(promise));
515 auto iter = addTasks(funcs.begin(), funcs.end());
518 EXPECT_TRUE(iter.hasCompleted());
519 EXPECT_TRUE(iter.hasPending());
520 EXPECT_TRUE(iter.hasNext());
523 EXPECT_TRUE(iter.hasCompleted());
524 EXPECT_TRUE(iter.hasPending());
525 EXPECT_TRUE(iter.hasNext());
528 EXPECT_FALSE(iter.hasCompleted());
529 EXPECT_TRUE(iter.hasPending());
530 EXPECT_TRUE(iter.hasNext());
533 EXPECT_FALSE(iter.hasCompleted());
534 EXPECT_FALSE(iter.hasPending());
535 EXPECT_FALSE(iter.hasNext());
538 } else if (pendingFibers.size()) {
539 pendingFibers.back().setValue(0);
540 pendingFibers.pop_back();
542 loopController.stop();
546 loopController.loop(std::move(loopFunc));
549 TEST(FiberManager, addTaskDynamic) {
550 folly::EventBase evb;
554 auto makeTask = [&](size_t taskId) {
555 return [&, taskId]() -> size_t {
556 batons[taskId].wait();
562 .addTaskFuture([&]() {
563 TaskIterator<size_t> iterator;
565 iterator.addTask(makeTask(0));
566 iterator.addTask(makeTask(1));
570 EXPECT_EQ(1, iterator.awaitNext());
572 iterator.addTask(makeTask(2));
576 EXPECT_EQ(2, iterator.awaitNext());
580 EXPECT_EQ(0, iterator.awaitNext());
585 TEST(FiberManager, forEach) {
586 std::vector<Promise<int>> pendingFibers;
587 bool taskAdded = false;
589 FiberManager manager(folly::make_unique<SimpleLoopController>());
590 auto& loopController =
591 dynamic_cast<SimpleLoopController&>(manager.loopController());
593 auto loopFunc = [&]() {
595 manager.addTask([&]() {
596 std::vector<std::function<int()>> funcs;
597 for (size_t i = 0; i < 3; ++i) {
598 funcs.push_back([i, &pendingFibers]() {
599 await([&pendingFibers](Promise<int> promise) {
600 pendingFibers.push_back(std::move(promise));
606 std::vector<std::pair<size_t, int>> results;
607 forEach(funcs.begin(), funcs.end(), [&results](size_t id, int result) {
608 results.emplace_back(id, result);
610 EXPECT_EQ(3, results.size());
611 EXPECT_TRUE(pendingFibers.empty());
612 for (size_t i = 0; i < 3; ++i) {
613 EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
617 } else if (pendingFibers.size()) {
618 pendingFibers.back().setValue(0);
619 pendingFibers.pop_back();
621 loopController.stop();
625 loopController.loop(std::move(loopFunc));
628 TEST(FiberManager, collectN) {
629 std::vector<Promise<int>> pendingFibers;
630 bool taskAdded = false;
632 FiberManager manager(folly::make_unique<SimpleLoopController>());
633 auto& loopController =
634 dynamic_cast<SimpleLoopController&>(manager.loopController());
636 auto loopFunc = [&]() {
638 manager.addTask([&]() {
639 std::vector<std::function<int()>> funcs;
640 for (size_t i = 0; i < 3; ++i) {
641 funcs.push_back([i, &pendingFibers]() {
642 await([&pendingFibers](Promise<int> promise) {
643 pendingFibers.push_back(std::move(promise));
649 auto results = collectN(funcs.begin(), funcs.end(), 2);
650 EXPECT_EQ(2, results.size());
651 EXPECT_EQ(1, pendingFibers.size());
652 for (size_t i = 0; i < 2; ++i) {
653 EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
657 } else if (pendingFibers.size()) {
658 pendingFibers.back().setValue(0);
659 pendingFibers.pop_back();
661 loopController.stop();
665 loopController.loop(std::move(loopFunc));
668 TEST(FiberManager, collectNThrow) {
669 std::vector<Promise<int>> pendingFibers;
670 bool taskAdded = false;
672 FiberManager manager(folly::make_unique<SimpleLoopController>());
673 auto& loopController =
674 dynamic_cast<SimpleLoopController&>(manager.loopController());
676 auto loopFunc = [&]() {
678 manager.addTask([&]() {
679 std::vector<std::function<int()>> funcs;
680 for (size_t i = 0; i < 3; ++i) {
681 funcs.push_back([i, &pendingFibers]() {
682 await([&pendingFibers](Promise<int> promise) {
683 pendingFibers.push_back(std::move(promise));
685 throw std::runtime_error("Runtime");
691 collectN(funcs.begin(), funcs.end(), 2);
693 EXPECT_EQ(1, pendingFibers.size());
697 } else if (pendingFibers.size()) {
698 pendingFibers.back().setValue(0);
699 pendingFibers.pop_back();
701 loopController.stop();
705 loopController.loop(std::move(loopFunc));
708 TEST(FiberManager, collectNVoid) {
709 std::vector<Promise<int>> pendingFibers;
710 bool taskAdded = false;
712 FiberManager manager(folly::make_unique<SimpleLoopController>());
713 auto& loopController =
714 dynamic_cast<SimpleLoopController&>(manager.loopController());
716 auto loopFunc = [&]() {
718 manager.addTask([&]() {
719 std::vector<std::function<void()>> funcs;
720 for (size_t i = 0; i < 3; ++i) {
721 funcs.push_back([i, &pendingFibers]() {
722 await([&pendingFibers](Promise<int> promise) {
723 pendingFibers.push_back(std::move(promise));
728 auto results = collectN(funcs.begin(), funcs.end(), 2);
729 EXPECT_EQ(2, results.size());
730 EXPECT_EQ(1, pendingFibers.size());
733 } else if (pendingFibers.size()) {
734 pendingFibers.back().setValue(0);
735 pendingFibers.pop_back();
737 loopController.stop();
741 loopController.loop(std::move(loopFunc));
744 TEST(FiberManager, collectNVoidThrow) {
745 std::vector<Promise<int>> pendingFibers;
746 bool taskAdded = false;
748 FiberManager manager(folly::make_unique<SimpleLoopController>());
749 auto& loopController =
750 dynamic_cast<SimpleLoopController&>(manager.loopController());
752 auto loopFunc = [&]() {
754 manager.addTask([&]() {
755 std::vector<std::function<void()>> funcs;
756 for (size_t i = 0; i < 3; ++i) {
757 funcs.push_back([i, &pendingFibers]() {
758 await([&pendingFibers](Promise<int> promise) {
759 pendingFibers.push_back(std::move(promise));
761 throw std::runtime_error("Runtime");
766 collectN(funcs.begin(), funcs.end(), 2);
768 EXPECT_EQ(1, pendingFibers.size());
772 } else if (pendingFibers.size()) {
773 pendingFibers.back().setValue(0);
774 pendingFibers.pop_back();
776 loopController.stop();
780 loopController.loop(std::move(loopFunc));
783 TEST(FiberManager, collectAll) {
784 std::vector<Promise<int>> pendingFibers;
785 bool taskAdded = false;
787 FiberManager manager(folly::make_unique<SimpleLoopController>());
788 auto& loopController =
789 dynamic_cast<SimpleLoopController&>(manager.loopController());
791 auto loopFunc = [&]() {
793 manager.addTask([&]() {
794 std::vector<std::function<int()>> funcs;
795 for (size_t i = 0; i < 3; ++i) {
796 funcs.push_back([i, &pendingFibers]() {
797 await([&pendingFibers](Promise<int> promise) {
798 pendingFibers.push_back(std::move(promise));
804 auto results = collectAll(funcs.begin(), funcs.end());
805 EXPECT_TRUE(pendingFibers.empty());
806 for (size_t i = 0; i < 3; ++i) {
807 EXPECT_EQ(i * 2 + 1, results[i]);
811 } else if (pendingFibers.size()) {
812 pendingFibers.back().setValue(0);
813 pendingFibers.pop_back();
815 loopController.stop();
819 loopController.loop(std::move(loopFunc));
822 TEST(FiberManager, collectAllVoid) {
823 std::vector<Promise<int>> pendingFibers;
824 bool taskAdded = false;
826 FiberManager manager(folly::make_unique<SimpleLoopController>());
827 auto& loopController =
828 dynamic_cast<SimpleLoopController&>(manager.loopController());
830 auto loopFunc = [&]() {
832 manager.addTask([&]() {
833 std::vector<std::function<void()>> funcs;
834 for (size_t i = 0; i < 3; ++i) {
835 funcs.push_back([i, &pendingFibers]() {
836 await([&pendingFibers](Promise<int> promise) {
837 pendingFibers.push_back(std::move(promise));
842 collectAll(funcs.begin(), funcs.end());
843 EXPECT_TRUE(pendingFibers.empty());
846 } else if (pendingFibers.size()) {
847 pendingFibers.back().setValue(0);
848 pendingFibers.pop_back();
850 loopController.stop();
854 loopController.loop(std::move(loopFunc));
857 TEST(FiberManager, collectAny) {
858 std::vector<Promise<int>> pendingFibers;
859 bool taskAdded = false;
861 FiberManager manager(folly::make_unique<SimpleLoopController>());
862 auto& loopController =
863 dynamic_cast<SimpleLoopController&>(manager.loopController());
865 auto loopFunc = [&]() {
867 manager.addTask([&]() {
868 std::vector<std::function<int()>> funcs;
869 for (size_t i = 0; i < 3; ++i) {
870 funcs.push_back([i, &pendingFibers]() {
871 await([&pendingFibers](Promise<int> promise) {
872 pendingFibers.push_back(std::move(promise));
875 throw std::runtime_error("This exception will be ignored");
881 auto result = collectAny(funcs.begin(), funcs.end());
882 EXPECT_EQ(2, pendingFibers.size());
883 EXPECT_EQ(2, result.first);
884 EXPECT_EQ(2 * 2 + 1, result.second);
887 } else if (pendingFibers.size()) {
888 pendingFibers.back().setValue(0);
889 pendingFibers.pop_back();
891 loopController.stop();
895 loopController.loop(std::move(loopFunc));
899 /* Checks that this function was run from a main context,
900 by comparing an address on a stack to a known main stack address
901 and a known related fiber stack address. The assumption
902 is that fiber stack and main stack will be far enough apart,
903 while any two values on the same stack will be close. */
904 void expectMainContext(bool& ran, int* mainLocation, int* fiberLocation) {
906 /* 2 pages is a good guess */
907 constexpr ssize_t DISTANCE = 0x2000 / sizeof(int);
909 EXPECT_TRUE(std::abs(&here - fiberLocation) > DISTANCE);
912 EXPECT_TRUE(std::abs(&here - mainLocation) < DISTANCE);
920 TEST(FiberManager, runInMainContext) {
921 FiberManager manager(folly::make_unique<SimpleLoopController>());
922 auto& loopController =
923 dynamic_cast<SimpleLoopController&>(manager.loopController());
925 bool checkRan = false;
928 manager.runInMainContext(
929 [&]() { expectMainContext(checkRan, &mainLocation, nullptr); });
930 EXPECT_TRUE(checkRan);
934 manager.addTask([&]() {
936 explicit A(int value_) : value(value_) {}
937 A(const A&) = delete;
943 auto ret = runInMainContext([&]() {
944 expectMainContext(checkRan, &mainLocation, &stackLocation);
947 EXPECT_TRUE(checkRan);
948 EXPECT_EQ(42, ret.value);
951 loopController.loop([&]() { loopController.stop(); });
953 EXPECT_TRUE(checkRan);
956 TEST(FiberManager, addTaskFinally) {
957 FiberManager manager(folly::make_unique<SimpleLoopController>());
958 auto& loopController =
959 dynamic_cast<SimpleLoopController&>(manager.loopController());
961 bool checkRan = false;
965 manager.addTaskFinally(
966 [&]() { return 1234; },
967 [&](Try<int>&& result) {
968 EXPECT_EQ(result.value(), 1234);
970 expectMainContext(checkRan, &mainLocation, nullptr);
973 EXPECT_FALSE(checkRan);
975 loopController.loop([&]() { loopController.stop(); });
977 EXPECT_TRUE(checkRan);
980 TEST(FiberManager, fibersPoolWithinLimit) {
981 FiberManager::Options opts;
982 opts.maxFibersPoolSize = 5;
984 FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
985 auto& loopController =
986 dynamic_cast<SimpleLoopController&>(manager.loopController());
988 size_t fibersRun = 0;
990 for (size_t i = 0; i < 5; ++i) {
991 manager.addTask([&]() { ++fibersRun; });
993 loopController.loop([&]() { loopController.stop(); });
995 EXPECT_EQ(5, fibersRun);
996 EXPECT_EQ(5, manager.fibersAllocated());
997 EXPECT_EQ(5, manager.fibersPoolSize());
999 for (size_t i = 0; i < 5; ++i) {
1000 manager.addTask([&]() { ++fibersRun; });
1002 loopController.loop([&]() { loopController.stop(); });
1004 EXPECT_EQ(10, fibersRun);
1005 EXPECT_EQ(5, manager.fibersAllocated());
1006 EXPECT_EQ(5, manager.fibersPoolSize());
1009 TEST(FiberManager, fibersPoolOverLimit) {
1010 FiberManager::Options opts;
1011 opts.maxFibersPoolSize = 5;
1013 FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
1014 auto& loopController =
1015 dynamic_cast<SimpleLoopController&>(manager.loopController());
1017 size_t fibersRun = 0;
1019 for (size_t i = 0; i < 10; ++i) {
1020 manager.addTask([&]() { ++fibersRun; });
1023 EXPECT_EQ(0, fibersRun);
1024 EXPECT_EQ(10, manager.fibersAllocated());
1025 EXPECT_EQ(0, manager.fibersPoolSize());
1027 loopController.loop([&]() { loopController.stop(); });
1029 EXPECT_EQ(10, fibersRun);
1030 EXPECT_EQ(5, manager.fibersAllocated());
1031 EXPECT_EQ(5, manager.fibersPoolSize());
1034 TEST(FiberManager, remoteFiberBasic) {
1035 FiberManager manager(folly::make_unique<SimpleLoopController>());
1036 auto& loopController =
1037 dynamic_cast<SimpleLoopController&>(manager.loopController());
1040 result[0] = result[1] = 0;
1041 folly::Optional<Promise<int>> savedPromise[2];
1042 manager.addTask([&]() {
1044 [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1046 manager.addTask([&]() {
1048 [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1051 manager.loopUntilNoReady();
1053 EXPECT_TRUE(savedPromise[0].hasValue());
1054 EXPECT_TRUE(savedPromise[1].hasValue());
1055 EXPECT_EQ(0, result[0]);
1056 EXPECT_EQ(0, result[1]);
1058 std::thread remoteThread0{[&]() { savedPromise[0]->setValue(42); }};
1059 std::thread remoteThread1{[&]() { savedPromise[1]->setValue(43); }};
1060 remoteThread0.join();
1061 remoteThread1.join();
1062 EXPECT_EQ(0, result[0]);
1063 EXPECT_EQ(0, result[1]);
1064 /* Should only have scheduled once */
1065 EXPECT_EQ(1, loopController.remoteScheduleCalled());
1067 manager.loopUntilNoReady();
1068 EXPECT_EQ(42, result[0]);
1069 EXPECT_EQ(43, result[1]);
1072 TEST(FiberManager, addTaskRemoteBasic) {
1073 FiberManager manager(folly::make_unique<SimpleLoopController>());
1076 result[0] = result[1] = 0;
1077 folly::Optional<Promise<int>> savedPromise[2];
1079 std::thread remoteThread0{[&]() {
1080 manager.addTaskRemote([&]() {
1082 [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1085 std::thread remoteThread1{[&]() {
1086 manager.addTaskRemote([&]() {
1088 [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1091 remoteThread0.join();
1092 remoteThread1.join();
1094 manager.loopUntilNoReady();
1096 EXPECT_TRUE(savedPromise[0].hasValue());
1097 EXPECT_TRUE(savedPromise[1].hasValue());
1098 EXPECT_EQ(0, result[0]);
1099 EXPECT_EQ(0, result[1]);
1101 savedPromise[0]->setValue(42);
1102 savedPromise[1]->setValue(43);
1104 EXPECT_EQ(0, result[0]);
1105 EXPECT_EQ(0, result[1]);
1107 manager.loopUntilNoReady();
1108 EXPECT_EQ(42, result[0]);
1109 EXPECT_EQ(43, result[1]);
1112 TEST(FiberManager, remoteHasTasks) {
1114 FiberManager fm(folly::make_unique<SimpleLoopController>());
1115 std::thread remote([&]() { fm.addTaskRemote([&]() { ++counter; }); });
1119 while (fm.hasTasks()) {
1120 fm.loopUntilNoReady();
1123 EXPECT_FALSE(fm.hasTasks());
1124 EXPECT_EQ(counter, 1);
1127 TEST(FiberManager, remoteHasReadyTasks) {
1129 folly::Optional<Promise<int>> savedPromise;
1130 FiberManager fm(folly::make_unique<SimpleLoopController>());
1131 std::thread remote([&]() {
1132 fm.addTaskRemote([&]() {
1134 [&](Promise<int> promise) { savedPromise = std::move(promise); });
1135 EXPECT_TRUE(fm.hasTasks());
1140 EXPECT_TRUE(fm.hasTasks());
1142 fm.loopUntilNoReady();
1143 EXPECT_TRUE(fm.hasTasks());
1145 std::thread remote2([&]() { savedPromise->setValue(47); });
1147 EXPECT_TRUE(fm.hasTasks());
1149 fm.loopUntilNoReady();
1150 EXPECT_FALSE(fm.hasTasks());
1152 EXPECT_EQ(result, 47);
1155 template <typename Data>
1156 void testFiberLocal() {
1158 LocalType<Data>(), folly::make_unique<SimpleLoopController>());
1161 EXPECT_EQ(42, local<Data>().value);
1163 local<Data>().value = 43;
1166 EXPECT_EQ(43, local<Data>().value);
1168 local<Data>().value = 44;
1170 addTask([]() { EXPECT_EQ(44, local<Data>().value); });
1175 EXPECT_EQ(42, local<Data>().value);
1177 local<Data>().value = 43;
1179 fm.addTaskRemote([]() { EXPECT_EQ(43, local<Data>().value); });
1183 EXPECT_EQ(42, local<Data>().value);
1184 local<Data>().value = 43;
1187 EXPECT_EQ(43, local<Data>().value);
1188 local<Data>().value = 44;
1190 std::vector<std::function<void()>> tasks{task};
1191 collectAny(tasks.begin(), tasks.end());
1193 EXPECT_EQ(43, local<Data>().value);
1196 fm.loopUntilNoReady();
1197 EXPECT_FALSE(fm.hasTasks());
1200 TEST(FiberManager, fiberLocal) {
1205 testFiberLocal<SimpleData>();
1208 TEST(FiberManager, fiberLocalHeap) {
1210 char _[1024 * 1024];
1214 testFiberLocal<LargeData>();
1217 TEST(FiberManager, fiberLocalDestructor) {
1224 EXPECT_EQ(42, local<CrazyData>().data);
1225 // Make sure we don't have infinite loop
1226 local<CrazyData>().data = 0;
1233 LocalType<CrazyData>(), folly::make_unique<SimpleLoopController>());
1235 fm.addTask([]() { local<CrazyData>().data = 41; });
1237 fm.loopUntilNoReady();
1238 EXPECT_FALSE(fm.hasTasks());
1241 TEST(FiberManager, yieldTest) {
1242 FiberManager manager(folly::make_unique<SimpleLoopController>());
1243 auto& loopController =
1244 dynamic_cast<SimpleLoopController&>(manager.loopController());
1246 bool checkRan = false;
1248 manager.addTask([&]() {
1253 loopController.loop([&]() {
1255 loopController.stop();
1259 EXPECT_TRUE(checkRan);
1262 TEST(FiberManager, RequestContext) {
1263 FiberManager fm(folly::make_unique<SimpleLoopController>());
1265 bool checkRun1 = false;
1266 bool checkRun2 = false;
1267 bool checkRun3 = false;
1268 bool checkRun4 = false;
1269 folly::fibers::Baton baton1;
1270 folly::fibers::Baton baton2;
1271 folly::fibers::Baton baton3;
1272 folly::fibers::Baton baton4;
1275 folly::RequestContextScopeGuard rctx;
1276 auto rcontext1 = folly::RequestContext::get();
1277 fm.addTask([&, rcontext1]() {
1278 EXPECT_EQ(rcontext1, folly::RequestContext::get());
1280 [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1281 EXPECT_EQ(rcontext1, folly::RequestContext::get());
1283 [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1288 folly::RequestContextScopeGuard rctx;
1289 auto rcontext2 = folly::RequestContext::get();
1290 fm.addTaskRemote([&, rcontext2]() {
1291 EXPECT_EQ(rcontext2, folly::RequestContext::get());
1293 EXPECT_EQ(rcontext2, folly::RequestContext::get());
1298 folly::RequestContextScopeGuard rctx;
1299 auto rcontext3 = folly::RequestContext::get();
1302 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1304 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1306 return folly::Unit();
1308 [&, rcontext3](Try<folly::Unit>&& /* t */) {
1309 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1314 folly::RequestContext::setContext(nullptr);
1316 folly::RequestContextScopeGuard rctx;
1317 auto rcontext4 = folly::RequestContext::get();
1319 EXPECT_EQ(rcontext4, folly::RequestContext::get());
1324 folly::RequestContextScopeGuard rctx;
1325 auto rcontext = folly::RequestContext::get();
1327 fm.loopUntilNoReady();
1328 EXPECT_EQ(rcontext, folly::RequestContext::get());
1331 EXPECT_EQ(rcontext, folly::RequestContext::get());
1332 fm.loopUntilNoReady();
1333 EXPECT_TRUE(checkRun1);
1334 EXPECT_EQ(rcontext, folly::RequestContext::get());
1337 EXPECT_EQ(rcontext, folly::RequestContext::get());
1338 fm.loopUntilNoReady();
1339 EXPECT_TRUE(checkRun2);
1340 EXPECT_EQ(rcontext, folly::RequestContext::get());
1343 EXPECT_EQ(rcontext, folly::RequestContext::get());
1344 fm.loopUntilNoReady();
1345 EXPECT_TRUE(checkRun3);
1346 EXPECT_EQ(rcontext, folly::RequestContext::get());
1349 EXPECT_EQ(rcontext, folly::RequestContext::get());
1350 fm.loopUntilNoReady();
1351 EXPECT_TRUE(checkRun4);
1352 EXPECT_EQ(rcontext, folly::RequestContext::get());
1356 TEST(FiberManager, resizePeriodically) {
1357 FiberManager::Options opts;
1358 opts.fibersPoolResizePeriodMs = 300;
1359 opts.maxFibersPoolSize = 5;
1361 FiberManager manager(folly::make_unique<EventBaseLoopController>(), opts);
1363 folly::EventBase evb;
1364 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1365 .attachEventBase(evb);
1367 std::vector<Baton> batons(10);
1369 size_t tasksRun = 0;
1370 for (size_t i = 0; i < 30; ++i) {
1371 manager.addTask([i, &batons, &tasksRun]() {
1373 // Keep some fibers active indefinitely
1374 if (i < batons.size()) {
1380 EXPECT_EQ(0, tasksRun);
1381 EXPECT_EQ(30, manager.fibersAllocated());
1382 EXPECT_EQ(0, manager.fibersPoolSize());
1385 EXPECT_EQ(30, tasksRun);
1386 EXPECT_EQ(30, manager.fibersAllocated());
1387 // Can go over maxFibersPoolSize, 10 of 30 fibers still active
1388 EXPECT_EQ(20, manager.fibersPoolSize());
1390 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1391 evb.loopOnce(); // no fibers active in this period
1392 EXPECT_EQ(30, manager.fibersAllocated());
1393 EXPECT_EQ(20, manager.fibersPoolSize());
1395 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1396 evb.loopOnce(); // should shrink fibers pool to maxFibersPoolSize
1397 EXPECT_EQ(15, manager.fibersAllocated());
1398 EXPECT_EQ(5, manager.fibersPoolSize());
1400 for (size_t i = 0; i < batons.size(); ++i) {
1404 EXPECT_EQ(15, manager.fibersAllocated());
1405 EXPECT_EQ(15, manager.fibersPoolSize());
1407 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1408 evb.loopOnce(); // 10 fibers active in last period
1409 EXPECT_EQ(10, manager.fibersAllocated());
1410 EXPECT_EQ(10, manager.fibersPoolSize());
1412 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1414 EXPECT_EQ(5, manager.fibersAllocated());
1415 EXPECT_EQ(5, manager.fibersPoolSize());
1418 TEST(FiberManager, batonWaitTimeoutHandler) {
1419 FiberManager manager(folly::make_unique<EventBaseLoopController>());
1421 folly::EventBase evb;
1422 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1423 .attachEventBase(evb);
1425 size_t fibersRun = 0;
1427 Baton::TimeoutHandler timeoutHandler;
1429 manager.addTask([&]() {
1430 baton.wait(timeoutHandler);
1433 manager.loopUntilNoReady();
1435 EXPECT_FALSE(baton.try_wait());
1436 EXPECT_EQ(0, fibersRun);
1438 timeoutHandler.scheduleTimeout(std::chrono::milliseconds(250));
1439 std::this_thread::sleep_for(std::chrono::milliseconds(500));
1441 EXPECT_FALSE(baton.try_wait());
1442 EXPECT_EQ(0, fibersRun);
1445 manager.loopUntilNoReady();
1447 EXPECT_EQ(1, fibersRun);
1450 TEST(FiberManager, batonWaitTimeoutMany) {
1451 FiberManager manager(folly::make_unique<EventBaseLoopController>());
1453 folly::EventBase evb;
1454 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1455 .attachEventBase(evb);
1457 constexpr size_t kNumTimeoutTasks = 10000;
1458 size_t tasksCount = kNumTimeoutTasks;
1460 // We add many tasks to hit timeout queue deallocation logic.
1461 for (size_t i = 0; i < kNumTimeoutTasks; ++i) {
1462 manager.addTask([&]() {
1464 Baton::TimeoutHandler timeoutHandler;
1466 folly::fibers::addTask([&] {
1467 timeoutHandler.scheduleTimeout(std::chrono::milliseconds(1000));
1470 baton.wait(timeoutHandler);
1471 if (--tasksCount == 0) {
1472 evb.terminateLoopSoon();
1480 TEST(FiberManager, remoteFutureTest) {
1481 FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
1482 auto& loopController =
1483 dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1487 auto f1 = fiberManager.addTaskFuture([&]() { return testValue1; });
1488 auto f2 = fiberManager.addTaskRemoteFuture([&]() { return testValue2; });
1489 loopController.loop([&]() { loopController.stop(); });
1493 EXPECT_EQ(v1, testValue1);
1494 EXPECT_EQ(v2, testValue2);
1497 // Test that a void function produes a Future<Unit>.
1498 TEST(FiberManager, remoteFutureVoidUnitTest) {
1499 FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
1500 auto& loopController =
1501 dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1503 bool ranLocal = false;
1504 folly::Future<folly::Unit> futureLocal =
1505 fiberManager.addTaskFuture([&]() { ranLocal = true; });
1507 bool ranRemote = false;
1508 folly::Future<folly::Unit> futureRemote =
1509 fiberManager.addTaskRemoteFuture([&]() { ranRemote = true; });
1511 loopController.loop([&]() { loopController.stop(); });
1514 ASSERT_TRUE(ranLocal);
1516 futureRemote.wait();
1517 ASSERT_TRUE(ranRemote);
1520 TEST(FiberManager, nestedFiberManagers) {
1521 folly::EventBase outerEvb;
1522 folly::EventBase innerEvb;
1524 getFiberManager(outerEvb).addTask([&]() {
1526 &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1528 runInMainContext([&]() {
1529 getFiberManager(innerEvb).addTask([&]() {
1531 &getFiberManager(innerEvb), FiberManager::getFiberManagerUnsafe());
1533 innerEvb.terminateLoopSoon();
1536 innerEvb.loopForever();
1540 &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1542 outerEvb.terminateLoopSoon();
1545 outerEvb.loopForever();
1548 TEST(FiberManager, semaphore) {
1549 constexpr size_t kTasks = 10;
1550 constexpr size_t kIterations = 10000;
1551 constexpr size_t kNumTokens = 10;
1553 Semaphore sem(kNumTokens);
1557 auto task = [&sem, kTasks, kIterations, kNumTokens](
1558 int& counter, folly::fibers::Baton& baton) {
1559 FiberManager manager(folly::make_unique<EventBaseLoopController>());
1560 folly::EventBase evb;
1561 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1562 .attachEventBase(evb);
1565 std::shared_ptr<folly::EventBase> completionCounter(
1566 &evb, [](folly::EventBase* evb) { evb->terminateLoopSoon(); });
1568 for (size_t i = 0; i < kTasks; ++i) {
1569 manager.addTask([&, completionCounter]() {
1570 for (size_t j = 0; j < kIterations; ++j) {
1576 EXPECT_LT(counter, kNumTokens);
1577 EXPECT_GE(counter, 0);
1587 folly::fibers::Baton batonA;
1588 folly::fibers::Baton batonB;
1589 std::thread threadA([&] { task(counterA, batonA); });
1590 std::thread threadB([&] { task(counterB, batonB); });
1597 EXPECT_LT(counterA, kNumTokens);
1598 EXPECT_LT(counterB, kNumTokens);
1599 EXPECT_GE(counterA, 0);
1600 EXPECT_GE(counterB, 0);
1603 template <typename ExecutorT>
1604 void singleBatchDispatch(ExecutorT& executor, int batchSize, int index) {
1605 thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
1606 executor, [=](std::vector<int>&& batch) {
1607 EXPECT_EQ(batchSize, batch.size());
1608 std::vector<std::string> results;
1609 for (auto& it : batch) {
1610 results.push_back(folly::to<std::string>(it));
1615 auto indexCopy = index;
1616 auto result = batchDispatcher.add(std::move(indexCopy));
1617 EXPECT_EQ(folly::to<std::string>(index), result.get());
1620 TEST(FiberManager, batchDispatchTest) {
1621 folly::EventBase evb;
1622 auto& executor = getFiberManager(evb);
1624 // Launch multiple fibers with a single id.
1625 executor.add([&]() {
1627 for (int i = 0; i < batchSize; i++) {
1629 [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
1634 // Reuse the same BatchDispatcher to batch once again.
1635 executor.add([&]() {
1637 for (int i = 0; i < batchSize; i++) {
1639 [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
1645 template <typename ExecutorT>
1646 folly::Future<std::vector<std::string>> doubleBatchInnerDispatch(
1647 ExecutorT& executor,
1648 int totalNumberOfElements,
1649 std::vector<int> input) {
1650 thread_local BatchDispatcher<
1652 std::vector<std::string>,
1654 batchDispatcher(executor, [=](std::vector<std::vector<int>>&& batch) {
1655 std::vector<std::vector<std::string>> results;
1656 int numberOfElements = 0;
1657 for (auto& unit : batch) {
1658 numberOfElements += unit.size();
1659 std::vector<std::string> result;
1660 for (auto& element : unit) {
1661 result.push_back(folly::to<std::string>(element));
1663 results.push_back(std::move(result));
1665 EXPECT_EQ(totalNumberOfElements, numberOfElements);
1669 return batchDispatcher.add(std::move(input));
1673 * Batch values in groups of 5, and then call inner dispatch.
1675 template <typename ExecutorT>
1676 void doubleBatchOuterDispatch(
1677 ExecutorT& executor,
1678 int totalNumberOfElements,
1680 thread_local BatchDispatcher<int, std::string, ExecutorT>
1681 batchDispatcher(executor, [=, &executor](std::vector<int>&& batch) {
1682 EXPECT_EQ(totalNumberOfElements, batch.size());
1683 std::vector<std::string> results;
1684 std::vector<folly::Future<std::vector<std::string>>>
1685 innerDispatchResultFutures;
1687 std::vector<int> group;
1688 for (auto unit : batch) {
1689 group.push_back(unit);
1690 if (group.size() == 5) {
1691 auto localGroup = group;
1694 innerDispatchResultFutures.push_back(doubleBatchInnerDispatch(
1695 executor, totalNumberOfElements, localGroup));
1700 innerDispatchResultFutures.begin(), innerDispatchResultFutures.end())
1702 std::vector<Try<std::vector<std::string>>> innerDispatchResults) {
1703 for (auto& unit : innerDispatchResults) {
1704 for (auto& element : unit.value()) {
1705 results.push_back(element);
1713 auto indexCopy = index;
1714 auto result = batchDispatcher.add(std::move(indexCopy));
1715 EXPECT_EQ(folly::to<std::string>(index), result.get());
1718 TEST(FiberManager, doubleBatchDispatchTest) {
1719 folly::EventBase evb;
1720 auto& executor = getFiberManager(evb);
1722 // Launch multiple fibers with a single id.
1723 executor.add([&]() {
1724 int totalNumberOfElements = 20;
1725 for (int i = 0; i < totalNumberOfElements; i++) {
1726 executor.add([=, &executor]() {
1727 doubleBatchOuterDispatch(executor, totalNumberOfElements, i);
1734 template <typename ExecutorT>
1735 void batchDispatchExceptionHandling(ExecutorT& executor, int i) {
1736 thread_local BatchDispatcher<int, int, ExecutorT> batchDispatcher(
1737 executor, [=, &executor](std::vector<int> &&) -> std::vector<int> {
1738 throw std::runtime_error("Surprise!!");
1741 EXPECT_THROW(batchDispatcher.add(i).get(), std::runtime_error);
1744 TEST(FiberManager, batchDispatchExceptionHandlingTest) {
1745 folly::EventBase evb;
1746 auto& executor = getFiberManager(evb);
1748 // Launch multiple fibers with a single id.
1749 executor.add([&]() {
1750 int totalNumberOfElements = 5;
1751 for (int i = 0; i < totalNumberOfElements; i++) {
1753 [=, &executor]() { batchDispatchExceptionHandling(executor, i); });
1759 namespace AtomicBatchDispatcherTesting {
1761 using ValueT = size_t;
1762 using ResultT = std::string;
1763 using DispatchFunctionT =
1764 folly::Function<std::vector<ResultT>(std::vector<ValueT>&&)>;
1766 #define ENABLE_TRACE_IN_TEST 0 // Set to 1 to debug issues in ABD tests
1767 #if ENABLE_TRACE_IN_TEST
1768 #define OUTPUT_TRACE std::cerr
1769 #else // ENABLE_TRACE_IN_TEST
1770 struct DevNullPiper {
1771 template <typename T>
1772 DevNullPiper& operator<<(const T&) {
1776 DevNullPiper& operator<<(std::ostream& (*)(std::ostream&)) {
1780 #define OUTPUT_TRACE devNullPiper
1781 #endif // ENABLE_TRACE_IN_TEST
1784 AtomicBatchDispatcher<ValueT, ResultT>::Token token;
1787 void preprocess(FiberManager& executor, bool die) {
1788 // Yield for a random duration [0, 10] ms to simulate I/O in preprocessing
1789 clock_t msecToDoIO = folly::Random::rand32() % 10;
1790 double start = (1000.0 * clock()) / CLOCKS_PER_SEC;
1791 double endAfter = start + msecToDoIO;
1792 while ((1000.0 * clock()) / CLOCKS_PER_SEC < endAfter) {
1796 throw std::logic_error("Simulating preprocessing failure");
1800 Job(AtomicBatchDispatcher<ValueT, ResultT>::Token&& t, ValueT i)
1801 : token(std::move(t)), input(i) {}
1803 Job(Job&&) = default;
1804 Job& operator=(Job&&) = default;
1807 ResultT processSingleInput(ValueT&& input) {
1808 return folly::to<ResultT>(std::move(input));
1811 std::vector<ResultT> userDispatchFunc(std::vector<ValueT>&& inputs) {
1812 size_t expectedCount = inputs.size();
1813 std::vector<ResultT> results;
1814 results.reserve(expectedCount);
1815 for (size_t i = 0; i < expectedCount; ++i) {
1816 results.emplace_back(processSingleInput(std::move(inputs[i])));
1822 AtomicBatchDispatcher<ValueT, ResultT>& atomicBatchDispatcher,
1823 std::vector<Job>& jobs,
1826 for (size_t i = 0; i < count; ++i) {
1827 jobs.emplace_back(Job(atomicBatchDispatcher.getToken(), i));
1831 enum class DispatchProblem {
1838 FiberManager& executor,
1839 std::vector<Job>& jobs,
1840 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1841 DispatchProblem dispatchProblem = DispatchProblem::None,
1842 size_t problemIndex = size_t(-1)) {
1844 dispatchProblem == DispatchProblem::None || problemIndex < jobs.size());
1846 results.resize(jobs.size());
1847 for (size_t i = 0; i < jobs.size(); ++i) {
1849 [i, &executor, &jobs, &results, dispatchProblem, problemIndex]() {
1851 Job job(std::move(jobs[i]));
1853 if (dispatchProblem == DispatchProblem::PreprocessThrows) {
1854 if (i == problemIndex) {
1855 EXPECT_THROW(job.preprocess(executor, true), std::logic_error);
1860 job.preprocess(executor, false);
1861 OUTPUT_TRACE << "Dispatching job #" << i << std::endl;
1862 results[i] = job.token.dispatch(job.input);
1863 OUTPUT_TRACE << "Result future filled for job #" << i << std::endl;
1865 if (dispatchProblem == DispatchProblem::DuplicateDispatch) {
1866 if (i == problemIndex) {
1867 EXPECT_THROW(job.token.dispatch(job.input), std::logic_error);
1871 OUTPUT_TRACE << "Preprocessing failed for job #" << i << std::endl;
1877 void validateResult(
1878 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1881 OUTPUT_TRACE << "results[" << i << "].value() : " << results[i]->value()
1883 } catch (std::exception& e) {
1884 OUTPUT_TRACE << "Exception : " << e.what() << std::endl;
1889 template <typename TException>
1890 void validateResults(
1891 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1892 size_t expectedNumResults) {
1893 size_t numResultsFilled = 0;
1894 for (size_t i = 0; i < results.size(); ++i) {
1899 EXPECT_THROW(validateResult(results, i), TException);
1901 EXPECT_EQ(numResultsFilled, expectedNumResults);
1904 void validateResults(
1905 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1906 size_t expectedNumResults) {
1907 size_t numResultsFilled = 0;
1908 for (size_t i = 0; i < results.size(); ++i) {
1913 EXPECT_NO_THROW(validateResult(results, i));
1914 ValueT expectedInput = i;
1916 results[i]->value(), processSingleInput(std::move(expectedInput)));
1918 EXPECT_EQ(numResultsFilled, expectedNumResults);
1921 } // AtomicBatchDispatcherTesting
1923 #define SET_UP_TEST_FUNC \
1924 using namespace AtomicBatchDispatcherTesting; \
1925 folly::EventBase evb; \
1926 auto& executor = getFiberManager(evb); \
1927 const size_t COUNT = 11; \
1928 std::vector<Job> jobs; \
1929 jobs.reserve(COUNT); \
1930 std::vector<folly::Optional<folly::Future<ResultT>>> results; \
1931 results.reserve(COUNT); \
1932 DispatchFunctionT dispatchFunc
1934 TEST(FiberManager, ABD_Test) {
1938 // Testing AtomicBatchDispatcher with explicit call to commit()
1940 dispatchFunc = userDispatchFunc;
1941 auto atomicBatchDispatcher =
1942 createAtomicBatchDispatcher(std::move(dispatchFunc));
1943 createJobs(atomicBatchDispatcher, jobs, COUNT);
1944 dispatchJobs(executor, jobs, results);
1945 atomicBatchDispatcher.commit();
1947 validateResults(results, COUNT);
1950 TEST(FiberManager, ABD_DispatcherDestroyedBeforeCallingCommit) {
1954 // Testing AtomicBatchDispatcher destroyed before calling commit.
1955 // Handles error cases for:
1956 // - User might have forgotten to add the call to commit() in the code
1957 // - An unexpected exception got thrown in user code before commit() is called
1960 dispatchFunc = userDispatchFunc;
1961 auto atomicBatchDispatcher =
1962 createAtomicBatchDispatcher(std::move(dispatchFunc));
1963 createJobs(atomicBatchDispatcher, jobs, COUNT);
1964 dispatchJobs(executor, jobs, results);
1965 throw std::runtime_error(
1966 "Unexpected exception in user code before commit called");
1967 atomicBatchDispatcher.commit();
1969 /* User code handles the exception and does not exit process */
1972 validateResults<std::logic_error>(results, COUNT);
1975 TEST(FiberManager, ABD_PreprocessingFailureTest) {
1979 // Testing preprocessing failure on a job throws
1981 dispatchFunc = userDispatchFunc;
1982 auto atomicBatchDispatcher =
1983 createAtomicBatchDispatcher(std::move(dispatchFunc));
1984 createJobs(atomicBatchDispatcher, jobs, COUNT);
1985 dispatchJobs(executor, jobs, results, DispatchProblem::PreprocessThrows, 8);
1986 atomicBatchDispatcher.commit();
1988 validateResults<std::logic_error>(results, COUNT - 1);
1991 TEST(FiberManager, ABD_MultipleDispatchOnSameTokenErrorTest) {
1995 // Testing that calling dispatch more than once on the same token throws
1997 dispatchFunc = userDispatchFunc;
1998 auto atomicBatchDispatcher =
1999 createAtomicBatchDispatcher(std::move(dispatchFunc));
2000 createJobs(atomicBatchDispatcher, jobs, COUNT);
2001 dispatchJobs(executor, jobs, results, DispatchProblem::DuplicateDispatch, 4);
2002 atomicBatchDispatcher.commit();
2006 TEST(FiberManager, ABD_GetTokenCalledAfterCommitTest) {
2010 // Testing that exception set on attempt to call getToken after commit called
2012 dispatchFunc = userDispatchFunc;
2013 auto atomicBatchDispatcher =
2014 createAtomicBatchDispatcher(std::move(dispatchFunc));
2015 createJobs(atomicBatchDispatcher, jobs, COUNT);
2016 atomicBatchDispatcher.commit();
2017 EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
2018 dispatchJobs(executor, jobs, results);
2019 EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
2021 validateResults(results, COUNT);
2022 EXPECT_THROW(atomicBatchDispatcher.getToken(), std::logic_error);
2025 TEST(FiberManager, ABD_UserProvidedBatchDispatchThrowsTest) {
2029 // Testing that exception is set if user provided batch dispatch throws
2031 dispatchFunc = [](std::vector<ValueT>&& inputs) -> std::vector<ResultT> {
2032 auto results = userDispatchFunc(std::move(inputs));
2033 throw std::runtime_error("Unexpected exception in user dispatch function");
2036 auto atomicBatchDispatcher =
2037 createAtomicBatchDispatcher(std::move(dispatchFunc));
2038 createJobs(atomicBatchDispatcher, jobs, COUNT);
2039 dispatchJobs(executor, jobs, results);
2040 atomicBatchDispatcher.commit();
2042 validateResults<std::runtime_error>(results, COUNT);
2045 TEST(FiberManager, VirtualEventBase) {
2046 folly::ScopedEventBaseThread thread;
2049 folly::make_unique<folly::VirtualEventBase>(*thread.getEventBase());
2051 folly::make_unique<folly::VirtualEventBase>(*thread.getEventBase());
2056 getFiberManager(*evb1).addTaskRemote([&] {
2058 baton.timed_wait(std::chrono::milliseconds{100});
2063 getFiberManager(*evb2).addTaskRemote([&] {
2065 baton.timed_wait(std::chrono::milliseconds{200});
2078 * Test that we can properly track fiber stack usage.
2080 * This functionality can only be enabled when ASAN is disabled, so avoid
2081 * running this test with ASAN.
2083 #ifndef FOLLY_SANITIZE_ADDRESS
2084 TEST(FiberManager, recordStack) {
2086 folly::fibers::FiberManager::Options opts;
2087 opts.recordStackEvery = 1;
2089 FiberManager fm(folly::make_unique<SimpleLoopController>(), opts);
2090 auto& loopController =
2091 dynamic_cast<SimpleLoopController&>(fm.loopController());
2093 static constexpr size_t n = 1000;
2097 for (size_t i = 0; i < n; ++i) {
2100 for (size_t i = 0; i + 1 < n; ++i) {
2101 s += b[i] * b[i + 1];
2107 loopController.loop([&]() { loopController.stop(); });
2109 // Check that we properly accounted fiber stack usage.
2110 EXPECT_LT(n * sizeof(int), fm.stackHighWatermark());