2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
20 #include <folly/Memory.h>
21 #include <folly/Random.h>
22 #include <folly/futures/Future.h>
24 #include <folly/Conv.h>
25 #include <folly/fibers/AddTasks.h>
26 #include <folly/fibers/AtomicBatchDispatcher.h>
27 #include <folly/fibers/BatchDispatcher.h>
28 #include <folly/fibers/EventBaseLoopController.h>
29 #include <folly/fibers/FiberManager.h>
30 #include <folly/fibers/FiberManagerMap.h>
31 #include <folly/fibers/GenericBaton.h>
32 #include <folly/fibers/Semaphore.h>
33 #include <folly/fibers/SimpleLoopController.h>
34 #include <folly/fibers/TimedMutex.h>
35 #include <folly/fibers/WhenN.h>
36 #include <folly/io/async/ScopedEventBaseThread.h>
37 #include <folly/portability/GTest.h>
39 using namespace folly::fibers;
43 TEST(FiberManager, batonTimedWaitTimeout) {
44 bool taskAdded = false;
45 size_t iterations = 0;
47 FiberManager manager(folly::make_unique<SimpleLoopController>());
48 auto& loopController =
49 dynamic_cast<SimpleLoopController&>(manager.loopController());
51 auto loopFunc = [&]() {
53 manager.addTask([&]() {
56 auto res = baton.timed_wait(std::chrono::milliseconds(230));
59 EXPECT_EQ(5, iterations);
61 loopController.stop();
63 manager.addTask([&]() {
66 auto res = baton.timed_wait(std::chrono::milliseconds(130));
69 EXPECT_EQ(3, iterations);
71 loopController.stop();
75 std::this_thread::sleep_for(std::chrono::milliseconds(50));
80 loopController.loop(std::move(loopFunc));
83 TEST(FiberManager, batonTimedWaitPost) {
84 bool taskAdded = false;
85 size_t iterations = 0;
88 FiberManager manager(folly::make_unique<SimpleLoopController>());
89 auto& loopController =
90 dynamic_cast<SimpleLoopController&>(manager.loopController());
92 auto loopFunc = [&]() {
94 manager.addTask([&]() {
98 auto res = baton.timed_wait(std::chrono::milliseconds(130));
101 EXPECT_EQ(2, iterations);
103 loopController.stop();
107 std::this_thread::sleep_for(std::chrono::milliseconds(50));
109 if (iterations == 2) {
115 loopController.loop(std::move(loopFunc));
118 TEST(FiberManager, batonTimedWaitTimeoutEvb) {
119 size_t tasksComplete = 0;
121 folly::EventBase evb;
123 FiberManager manager(folly::make_unique<EventBaseLoopController>());
124 dynamic_cast<EventBaseLoopController&>(manager.loopController())
125 .attachEventBase(evb);
127 auto task = [&](size_t timeout_ms) {
130 auto start = EventBaseLoopController::Clock::now();
131 auto res = baton.timed_wait(std::chrono::milliseconds(timeout_ms));
132 auto finish = EventBaseLoopController::Clock::now();
137 std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
139 EXPECT_GT(duration_ms.count(), timeout_ms - 50);
140 EXPECT_LT(duration_ms.count(), timeout_ms + 50);
142 if (++tasksComplete == 2) {
143 evb.terminateLoopSoon();
147 evb.runInEventBaseThread([&]() {
148 manager.addTask([&]() { task(500); });
149 manager.addTask([&]() { task(250); });
154 EXPECT_EQ(2, tasksComplete);
157 TEST(FiberManager, batonTimedWaitPostEvb) {
158 size_t tasksComplete = 0;
160 folly::EventBase evb;
162 FiberManager manager(folly::make_unique<EventBaseLoopController>());
163 dynamic_cast<EventBaseLoopController&>(manager.loopController())
164 .attachEventBase(evb);
166 evb.runInEventBaseThread([&]() {
167 manager.addTask([&]() {
170 evb.tryRunAfterDelay([&]() { baton.post(); }, 100);
172 auto start = EventBaseLoopController::Clock::now();
173 auto res = baton.timed_wait(std::chrono::milliseconds(130));
174 auto finish = EventBaseLoopController::Clock::now();
179 std::chrono::duration_cast<std::chrono::milliseconds>(finish - start);
181 EXPECT_TRUE(duration_ms.count() > 95 && duration_ms.count() < 110);
183 if (++tasksComplete == 1) {
184 evb.terminateLoopSoon();
191 EXPECT_EQ(1, tasksComplete);
194 TEST(FiberManager, batonTryWait) {
195 FiberManager manager(folly::make_unique<SimpleLoopController>());
197 // Check if try_wait and post work as expected
200 manager.addTask([&]() {
201 while (!b.try_wait()) {
204 auto thr = std::thread([&]() {
205 std::this_thread::sleep_for(std::chrono::milliseconds(300));
209 manager.loopUntilNoReady();
214 // Check try_wait without post
215 manager.addTask([&]() {
217 while (cnt && !c.try_wait()) {
220 EXPECT_TRUE(!c.try_wait()); // must still hold
224 manager.loopUntilNoReady();
227 TEST(FiberManager, genericBatonFiberWait) {
228 FiberManager manager(folly::make_unique<SimpleLoopController>());
231 bool fiberRunning = false;
233 manager.addTask([&]() {
234 EXPECT_EQ(manager.hasActiveFiber(), true);
237 fiberRunning = false;
240 EXPECT_FALSE(fiberRunning);
241 manager.loopUntilNoReady();
242 EXPECT_TRUE(fiberRunning); // ensure fiber still active
244 auto thr = std::thread([&]() {
245 std::this_thread::sleep_for(std::chrono::milliseconds(300));
249 while (fiberRunning) {
250 manager.loopUntilNoReady();
256 TEST(FiberManager, genericBatonThreadWait) {
257 FiberManager manager(folly::make_unique<SimpleLoopController>());
259 std::atomic<bool> threadWaiting(false);
261 auto thr = std::thread([&]() {
262 threadWaiting = true;
264 threadWaiting = false;
267 while (!threadWaiting) {
269 std::this_thread::sleep_for(std::chrono::milliseconds(300));
271 manager.addTask([&]() {
272 EXPECT_EQ(manager.hasActiveFiber(), true);
273 EXPECT_TRUE(threadWaiting);
275 while (threadWaiting) {
279 manager.loopUntilNoReady();
283 TEST(FiberManager, addTasksNoncopyable) {
284 std::vector<Promise<int>> pendingFibers;
285 bool taskAdded = false;
287 FiberManager manager(folly::make_unique<SimpleLoopController>());
288 auto& loopController =
289 dynamic_cast<SimpleLoopController&>(manager.loopController());
291 auto loopFunc = [&]() {
293 manager.addTask([&]() {
294 std::vector<std::function<std::unique_ptr<int>()>> funcs;
295 for (int i = 0; i < 3; ++i) {
296 funcs.push_back([i, &pendingFibers]() {
297 await([&pendingFibers](Promise<int> promise) {
298 pendingFibers.push_back(std::move(promise));
300 return folly::make_unique<int>(i * 2 + 1);
304 auto iter = addTasks(funcs.begin(), funcs.end());
307 while (iter.hasNext()) {
308 auto result = iter.awaitNext();
309 EXPECT_EQ(2 * iter.getTaskID() + 1, *result);
310 EXPECT_GE(2 - n, pendingFibers.size());
316 } else if (pendingFibers.size()) {
317 pendingFibers.back().setValue(0);
318 pendingFibers.pop_back();
320 loopController.stop();
324 loopController.loop(std::move(loopFunc));
327 TEST(FiberManager, awaitThrow) {
328 folly::EventBase evb;
329 struct ExpectedException {};
333 await([](Promise<int> p) {
335 throw ExpectedException();
341 await([&](Promise<int> p) {
342 evb.runInEventBaseThread([p = std::move(p)]() mutable {
345 throw ExpectedException();
352 TEST(FiberManager, addTasksThrow) {
353 std::vector<Promise<int>> pendingFibers;
354 bool taskAdded = false;
356 FiberManager manager(folly::make_unique<SimpleLoopController>());
357 auto& loopController =
358 dynamic_cast<SimpleLoopController&>(manager.loopController());
360 auto loopFunc = [&]() {
362 manager.addTask([&]() {
363 std::vector<std::function<int()>> funcs;
364 for (size_t i = 0; i < 3; ++i) {
365 funcs.push_back([i, &pendingFibers]() {
366 await([&pendingFibers](Promise<int> promise) {
367 pendingFibers.push_back(std::move(promise));
370 throw std::runtime_error("Runtime");
376 auto iter = addTasks(funcs.begin(), funcs.end());
379 while (iter.hasNext()) {
381 int result = iter.awaitNext();
382 EXPECT_EQ(1, iter.getTaskID() % 2);
383 EXPECT_EQ(2 * iter.getTaskID() + 1, result);
385 EXPECT_EQ(0, iter.getTaskID() % 2);
387 EXPECT_GE(2 - n, pendingFibers.size());
393 } else if (pendingFibers.size()) {
394 pendingFibers.back().setValue(0);
395 pendingFibers.pop_back();
397 loopController.stop();
401 loopController.loop(std::move(loopFunc));
404 TEST(FiberManager, addTasksVoid) {
405 std::vector<Promise<int>> pendingFibers;
406 bool taskAdded = false;
408 FiberManager manager(folly::make_unique<SimpleLoopController>());
409 auto& loopController =
410 dynamic_cast<SimpleLoopController&>(manager.loopController());
412 auto loopFunc = [&]() {
414 manager.addTask([&]() {
415 std::vector<std::function<void()>> funcs;
416 for (size_t i = 0; i < 3; ++i) {
417 funcs.push_back([i, &pendingFibers]() {
418 await([&pendingFibers](Promise<int> promise) {
419 pendingFibers.push_back(std::move(promise));
424 auto iter = addTasks(funcs.begin(), funcs.end());
427 while (iter.hasNext()) {
429 EXPECT_GE(2 - n, pendingFibers.size());
435 } else if (pendingFibers.size()) {
436 pendingFibers.back().setValue(0);
437 pendingFibers.pop_back();
439 loopController.stop();
443 loopController.loop(std::move(loopFunc));
446 TEST(FiberManager, addTasksVoidThrow) {
447 std::vector<Promise<int>> pendingFibers;
448 bool taskAdded = false;
450 FiberManager manager(folly::make_unique<SimpleLoopController>());
451 auto& loopController =
452 dynamic_cast<SimpleLoopController&>(manager.loopController());
454 auto loopFunc = [&]() {
456 manager.addTask([&]() {
457 std::vector<std::function<void()>> funcs;
458 for (size_t i = 0; i < 3; ++i) {
459 funcs.push_back([i, &pendingFibers]() {
460 await([&pendingFibers](Promise<int> promise) {
461 pendingFibers.push_back(std::move(promise));
464 throw std::runtime_error("");
469 auto iter = addTasks(funcs.begin(), funcs.end());
472 while (iter.hasNext()) {
475 EXPECT_EQ(1, iter.getTaskID() % 2);
477 EXPECT_EQ(0, iter.getTaskID() % 2);
479 EXPECT_GE(2 - n, pendingFibers.size());
485 } else if (pendingFibers.size()) {
486 pendingFibers.back().setValue(0);
487 pendingFibers.pop_back();
489 loopController.stop();
493 loopController.loop(std::move(loopFunc));
496 TEST(FiberManager, addTasksReserve) {
497 std::vector<Promise<int>> pendingFibers;
498 bool taskAdded = false;
500 FiberManager manager(folly::make_unique<SimpleLoopController>());
501 auto& loopController =
502 dynamic_cast<SimpleLoopController&>(manager.loopController());
504 auto loopFunc = [&]() {
506 manager.addTask([&]() {
507 std::vector<std::function<void()>> funcs;
508 for (size_t i = 0; i < 3; ++i) {
509 funcs.push_back([&pendingFibers]() {
510 await([&pendingFibers](Promise<int> promise) {
511 pendingFibers.push_back(std::move(promise));
516 auto iter = addTasks(funcs.begin(), funcs.end());
519 EXPECT_TRUE(iter.hasCompleted());
520 EXPECT_TRUE(iter.hasPending());
521 EXPECT_TRUE(iter.hasNext());
524 EXPECT_TRUE(iter.hasCompleted());
525 EXPECT_TRUE(iter.hasPending());
526 EXPECT_TRUE(iter.hasNext());
529 EXPECT_FALSE(iter.hasCompleted());
530 EXPECT_TRUE(iter.hasPending());
531 EXPECT_TRUE(iter.hasNext());
534 EXPECT_FALSE(iter.hasCompleted());
535 EXPECT_FALSE(iter.hasPending());
536 EXPECT_FALSE(iter.hasNext());
539 } else if (pendingFibers.size()) {
540 pendingFibers.back().setValue(0);
541 pendingFibers.pop_back();
543 loopController.stop();
547 loopController.loop(std::move(loopFunc));
550 TEST(FiberManager, addTaskDynamic) {
551 folly::EventBase evb;
555 auto makeTask = [&](size_t taskId) {
556 return [&, taskId]() -> size_t {
557 batons[taskId].wait();
563 .addTaskFuture([&]() {
564 TaskIterator<size_t> iterator;
566 iterator.addTask(makeTask(0));
567 iterator.addTask(makeTask(1));
571 EXPECT_EQ(1, iterator.awaitNext());
573 iterator.addTask(makeTask(2));
577 EXPECT_EQ(2, iterator.awaitNext());
581 EXPECT_EQ(0, iterator.awaitNext());
586 TEST(FiberManager, forEach) {
587 std::vector<Promise<int>> pendingFibers;
588 bool taskAdded = false;
590 FiberManager manager(folly::make_unique<SimpleLoopController>());
591 auto& loopController =
592 dynamic_cast<SimpleLoopController&>(manager.loopController());
594 auto loopFunc = [&]() {
596 manager.addTask([&]() {
597 std::vector<std::function<int()>> funcs;
598 for (size_t i = 0; i < 3; ++i) {
599 funcs.push_back([i, &pendingFibers]() {
600 await([&pendingFibers](Promise<int> promise) {
601 pendingFibers.push_back(std::move(promise));
607 std::vector<std::pair<size_t, int>> results;
608 forEach(funcs.begin(), funcs.end(), [&results](size_t id, int result) {
609 results.emplace_back(id, result);
611 EXPECT_EQ(3, results.size());
612 EXPECT_TRUE(pendingFibers.empty());
613 for (size_t i = 0; i < 3; ++i) {
614 EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
618 } else if (pendingFibers.size()) {
619 pendingFibers.back().setValue(0);
620 pendingFibers.pop_back();
622 loopController.stop();
626 loopController.loop(std::move(loopFunc));
629 TEST(FiberManager, collectN) {
630 std::vector<Promise<int>> pendingFibers;
631 bool taskAdded = false;
633 FiberManager manager(folly::make_unique<SimpleLoopController>());
634 auto& loopController =
635 dynamic_cast<SimpleLoopController&>(manager.loopController());
637 auto loopFunc = [&]() {
639 manager.addTask([&]() {
640 std::vector<std::function<int()>> funcs;
641 for (size_t i = 0; i < 3; ++i) {
642 funcs.push_back([i, &pendingFibers]() {
643 await([&pendingFibers](Promise<int> promise) {
644 pendingFibers.push_back(std::move(promise));
650 auto results = collectN(funcs.begin(), funcs.end(), 2);
651 EXPECT_EQ(2, results.size());
652 EXPECT_EQ(1, pendingFibers.size());
653 for (size_t i = 0; i < 2; ++i) {
654 EXPECT_EQ(results[i].first * 2 + 1, results[i].second);
658 } else if (pendingFibers.size()) {
659 pendingFibers.back().setValue(0);
660 pendingFibers.pop_back();
662 loopController.stop();
666 loopController.loop(std::move(loopFunc));
669 TEST(FiberManager, collectNThrow) {
670 std::vector<Promise<int>> pendingFibers;
671 bool taskAdded = false;
673 FiberManager manager(folly::make_unique<SimpleLoopController>());
674 auto& loopController =
675 dynamic_cast<SimpleLoopController&>(manager.loopController());
677 auto loopFunc = [&]() {
679 manager.addTask([&]() {
680 std::vector<std::function<int()>> funcs;
681 for (size_t i = 0; i < 3; ++i) {
682 funcs.push_back([i, &pendingFibers]() -> size_t {
683 await([&pendingFibers](Promise<int> promise) {
684 pendingFibers.push_back(std::move(promise));
686 throw std::runtime_error("Runtime");
691 collectN(funcs.begin(), funcs.end(), 2);
693 EXPECT_EQ(1, pendingFibers.size());
697 } else if (pendingFibers.size()) {
698 pendingFibers.back().setValue(0);
699 pendingFibers.pop_back();
701 loopController.stop();
705 loopController.loop(std::move(loopFunc));
708 TEST(FiberManager, collectNVoid) {
709 std::vector<Promise<int>> pendingFibers;
710 bool taskAdded = false;
712 FiberManager manager(folly::make_unique<SimpleLoopController>());
713 auto& loopController =
714 dynamic_cast<SimpleLoopController&>(manager.loopController());
716 auto loopFunc = [&]() {
718 manager.addTask([&]() {
719 std::vector<std::function<void()>> funcs;
720 for (size_t i = 0; i < 3; ++i) {
721 funcs.push_back([i, &pendingFibers]() {
722 await([&pendingFibers](Promise<int> promise) {
723 pendingFibers.push_back(std::move(promise));
728 auto results = collectN(funcs.begin(), funcs.end(), 2);
729 EXPECT_EQ(2, results.size());
730 EXPECT_EQ(1, pendingFibers.size());
733 } else if (pendingFibers.size()) {
734 pendingFibers.back().setValue(0);
735 pendingFibers.pop_back();
737 loopController.stop();
741 loopController.loop(std::move(loopFunc));
744 TEST(FiberManager, collectNVoidThrow) {
745 std::vector<Promise<int>> pendingFibers;
746 bool taskAdded = false;
748 FiberManager manager(folly::make_unique<SimpleLoopController>());
749 auto& loopController =
750 dynamic_cast<SimpleLoopController&>(manager.loopController());
752 auto loopFunc = [&]() {
754 manager.addTask([&]() {
755 std::vector<std::function<void()>> funcs;
756 for (size_t i = 0; i < 3; ++i) {
757 funcs.push_back([i, &pendingFibers]() {
758 await([&pendingFibers](Promise<int> promise) {
759 pendingFibers.push_back(std::move(promise));
761 throw std::runtime_error("Runtime");
766 collectN(funcs.begin(), funcs.end(), 2);
768 EXPECT_EQ(1, pendingFibers.size());
772 } else if (pendingFibers.size()) {
773 pendingFibers.back().setValue(0);
774 pendingFibers.pop_back();
776 loopController.stop();
780 loopController.loop(std::move(loopFunc));
783 TEST(FiberManager, collectAll) {
784 std::vector<Promise<int>> pendingFibers;
785 bool taskAdded = false;
787 FiberManager manager(folly::make_unique<SimpleLoopController>());
788 auto& loopController =
789 dynamic_cast<SimpleLoopController&>(manager.loopController());
791 auto loopFunc = [&]() {
793 manager.addTask([&]() {
794 std::vector<std::function<int()>> funcs;
795 for (size_t i = 0; i < 3; ++i) {
796 funcs.push_back([i, &pendingFibers]() {
797 await([&pendingFibers](Promise<int> promise) {
798 pendingFibers.push_back(std::move(promise));
804 auto results = collectAll(funcs.begin(), funcs.end());
805 EXPECT_TRUE(pendingFibers.empty());
806 for (size_t i = 0; i < 3; ++i) {
807 EXPECT_EQ(i * 2 + 1, results[i]);
811 } else if (pendingFibers.size()) {
812 pendingFibers.back().setValue(0);
813 pendingFibers.pop_back();
815 loopController.stop();
819 loopController.loop(std::move(loopFunc));
822 TEST(FiberManager, collectAllVoid) {
823 std::vector<Promise<int>> pendingFibers;
824 bool taskAdded = false;
826 FiberManager manager(folly::make_unique<SimpleLoopController>());
827 auto& loopController =
828 dynamic_cast<SimpleLoopController&>(manager.loopController());
830 auto loopFunc = [&]() {
832 manager.addTask([&]() {
833 std::vector<std::function<void()>> funcs;
834 for (size_t i = 0; i < 3; ++i) {
835 funcs.push_back([i, &pendingFibers]() {
836 await([&pendingFibers](Promise<int> promise) {
837 pendingFibers.push_back(std::move(promise));
842 collectAll(funcs.begin(), funcs.end());
843 EXPECT_TRUE(pendingFibers.empty());
846 } else if (pendingFibers.size()) {
847 pendingFibers.back().setValue(0);
848 pendingFibers.pop_back();
850 loopController.stop();
854 loopController.loop(std::move(loopFunc));
857 TEST(FiberManager, collectAny) {
858 std::vector<Promise<int>> pendingFibers;
859 bool taskAdded = false;
861 FiberManager manager(folly::make_unique<SimpleLoopController>());
862 auto& loopController =
863 dynamic_cast<SimpleLoopController&>(manager.loopController());
865 auto loopFunc = [&]() {
867 manager.addTask([&]() {
868 std::vector<std::function<int()>> funcs;
869 for (size_t i = 0; i < 3; ++i) {
870 funcs.push_back([i, &pendingFibers]() {
871 await([&pendingFibers](Promise<int> promise) {
872 pendingFibers.push_back(std::move(promise));
875 throw std::runtime_error("This exception will be ignored");
881 auto result = collectAny(funcs.begin(), funcs.end());
882 EXPECT_EQ(2, pendingFibers.size());
883 EXPECT_EQ(2, result.first);
884 EXPECT_EQ(2 * 2 + 1, result.second);
887 } else if (pendingFibers.size()) {
888 pendingFibers.back().setValue(0);
889 pendingFibers.pop_back();
891 loopController.stop();
895 loopController.loop(std::move(loopFunc));
899 /* Checks that this function was run from a main context,
900 by comparing an address on a stack to a known main stack address
901 and a known related fiber stack address. The assumption
902 is that fiber stack and main stack will be far enough apart,
903 while any two values on the same stack will be close. */
904 void expectMainContext(bool& ran, int* mainLocation, int* fiberLocation) {
906 /* 2 pages is a good guess */
907 constexpr ssize_t DISTANCE = 0x2000 / sizeof(int);
909 EXPECT_TRUE(std::abs(&here - fiberLocation) > DISTANCE);
912 EXPECT_TRUE(std::abs(&here - mainLocation) < DISTANCE);
920 TEST(FiberManager, runInMainContext) {
921 FiberManager manager(folly::make_unique<SimpleLoopController>());
922 auto& loopController =
923 dynamic_cast<SimpleLoopController&>(manager.loopController());
925 bool checkRan = false;
928 manager.runInMainContext(
929 [&]() { expectMainContext(checkRan, &mainLocation, nullptr); });
930 EXPECT_TRUE(checkRan);
934 manager.addTask([&]() {
936 explicit A(int value_) : value(value_) {}
937 A(const A&) = delete;
943 auto ret = runInMainContext([&]() {
944 expectMainContext(checkRan, &mainLocation, &stackLocation);
947 EXPECT_TRUE(checkRan);
948 EXPECT_EQ(42, ret.value);
951 loopController.loop([&]() { loopController.stop(); });
953 EXPECT_TRUE(checkRan);
956 TEST(FiberManager, addTaskFinally) {
957 FiberManager manager(folly::make_unique<SimpleLoopController>());
958 auto& loopController =
959 dynamic_cast<SimpleLoopController&>(manager.loopController());
961 bool checkRan = false;
965 manager.addTaskFinally(
966 [&]() { return 1234; },
967 [&](Try<int>&& result) {
968 EXPECT_EQ(result.value(), 1234);
970 expectMainContext(checkRan, &mainLocation, nullptr);
973 EXPECT_FALSE(checkRan);
975 loopController.loop([&]() { loopController.stop(); });
977 EXPECT_TRUE(checkRan);
980 TEST(FiberManager, fibersPoolWithinLimit) {
981 FiberManager::Options opts;
982 opts.maxFibersPoolSize = 5;
984 FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
985 auto& loopController =
986 dynamic_cast<SimpleLoopController&>(manager.loopController());
988 size_t fibersRun = 0;
990 for (size_t i = 0; i < 5; ++i) {
991 manager.addTask([&]() { ++fibersRun; });
993 loopController.loop([&]() { loopController.stop(); });
995 EXPECT_EQ(5, fibersRun);
996 EXPECT_EQ(5, manager.fibersAllocated());
997 EXPECT_EQ(5, manager.fibersPoolSize());
999 for (size_t i = 0; i < 5; ++i) {
1000 manager.addTask([&]() { ++fibersRun; });
1002 loopController.loop([&]() { loopController.stop(); });
1004 EXPECT_EQ(10, fibersRun);
1005 EXPECT_EQ(5, manager.fibersAllocated());
1006 EXPECT_EQ(5, manager.fibersPoolSize());
1009 TEST(FiberManager, fibersPoolOverLimit) {
1010 FiberManager::Options opts;
1011 opts.maxFibersPoolSize = 5;
1013 FiberManager manager(folly::make_unique<SimpleLoopController>(), opts);
1014 auto& loopController =
1015 dynamic_cast<SimpleLoopController&>(manager.loopController());
1017 size_t fibersRun = 0;
1019 for (size_t i = 0; i < 10; ++i) {
1020 manager.addTask([&]() { ++fibersRun; });
1023 EXPECT_EQ(0, fibersRun);
1024 EXPECT_EQ(10, manager.fibersAllocated());
1025 EXPECT_EQ(0, manager.fibersPoolSize());
1027 loopController.loop([&]() { loopController.stop(); });
1029 EXPECT_EQ(10, fibersRun);
1030 EXPECT_EQ(5, manager.fibersAllocated());
1031 EXPECT_EQ(5, manager.fibersPoolSize());
1034 TEST(FiberManager, remoteFiberBasic) {
1035 FiberManager manager(folly::make_unique<SimpleLoopController>());
1036 auto& loopController =
1037 dynamic_cast<SimpleLoopController&>(manager.loopController());
1040 result[0] = result[1] = 0;
1041 folly::Optional<Promise<int>> savedPromise[2];
1042 manager.addTask([&]() {
1044 [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1046 manager.addTask([&]() {
1048 [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1051 manager.loopUntilNoReady();
1053 EXPECT_TRUE(savedPromise[0].hasValue());
1054 EXPECT_TRUE(savedPromise[1].hasValue());
1055 EXPECT_EQ(0, result[0]);
1056 EXPECT_EQ(0, result[1]);
1058 std::thread remoteThread0{[&]() { savedPromise[0]->setValue(42); }};
1059 std::thread remoteThread1{[&]() { savedPromise[1]->setValue(43); }};
1060 remoteThread0.join();
1061 remoteThread1.join();
1062 EXPECT_EQ(0, result[0]);
1063 EXPECT_EQ(0, result[1]);
1064 /* Should only have scheduled once */
1065 EXPECT_EQ(1, loopController.remoteScheduleCalled());
1067 manager.loopUntilNoReady();
1068 EXPECT_EQ(42, result[0]);
1069 EXPECT_EQ(43, result[1]);
1072 TEST(FiberManager, addTaskRemoteBasic) {
1073 FiberManager manager(folly::make_unique<SimpleLoopController>());
1076 result[0] = result[1] = 0;
1077 folly::Optional<Promise<int>> savedPromise[2];
1079 std::thread remoteThread0{[&]() {
1080 manager.addTaskRemote([&]() {
1082 [&](Promise<int> promise) { savedPromise[0] = std::move(promise); });
1085 std::thread remoteThread1{[&]() {
1086 manager.addTaskRemote([&]() {
1088 [&](Promise<int> promise) { savedPromise[1] = std::move(promise); });
1091 remoteThread0.join();
1092 remoteThread1.join();
1094 manager.loopUntilNoReady();
1096 EXPECT_TRUE(savedPromise[0].hasValue());
1097 EXPECT_TRUE(savedPromise[1].hasValue());
1098 EXPECT_EQ(0, result[0]);
1099 EXPECT_EQ(0, result[1]);
1101 savedPromise[0]->setValue(42);
1102 savedPromise[1]->setValue(43);
1104 EXPECT_EQ(0, result[0]);
1105 EXPECT_EQ(0, result[1]);
1107 manager.loopUntilNoReady();
1108 EXPECT_EQ(42, result[0]);
1109 EXPECT_EQ(43, result[1]);
1112 TEST(FiberManager, remoteHasTasks) {
1114 FiberManager fm(folly::make_unique<SimpleLoopController>());
1115 std::thread remote([&]() { fm.addTaskRemote([&]() { ++counter; }); });
1119 while (fm.hasTasks()) {
1120 fm.loopUntilNoReady();
1123 EXPECT_FALSE(fm.hasTasks());
1124 EXPECT_EQ(counter, 1);
1127 TEST(FiberManager, remoteHasReadyTasks) {
1129 folly::Optional<Promise<int>> savedPromise;
1130 FiberManager fm(folly::make_unique<SimpleLoopController>());
1131 std::thread remote([&]() {
1132 fm.addTaskRemote([&]() {
1134 [&](Promise<int> promise) { savedPromise = std::move(promise); });
1135 EXPECT_TRUE(fm.hasTasks());
1140 EXPECT_TRUE(fm.hasTasks());
1142 fm.loopUntilNoReady();
1143 EXPECT_TRUE(fm.hasTasks());
1145 std::thread remote2([&]() { savedPromise->setValue(47); });
1147 EXPECT_TRUE(fm.hasTasks());
1149 fm.loopUntilNoReady();
1150 EXPECT_FALSE(fm.hasTasks());
1152 EXPECT_EQ(result, 47);
1155 template <typename Data>
1156 void testFiberLocal() {
1158 LocalType<Data>(), folly::make_unique<SimpleLoopController>());
1161 EXPECT_EQ(42, local<Data>().value);
1163 local<Data>().value = 43;
1166 EXPECT_EQ(43, local<Data>().value);
1168 local<Data>().value = 44;
1170 addTask([]() { EXPECT_EQ(44, local<Data>().value); });
1175 EXPECT_EQ(42, local<Data>().value);
1177 local<Data>().value = 43;
1179 fm.addTaskRemote([]() { EXPECT_EQ(43, local<Data>().value); });
1183 EXPECT_EQ(42, local<Data>().value);
1184 local<Data>().value = 43;
1187 EXPECT_EQ(43, local<Data>().value);
1188 local<Data>().value = 44;
1190 std::vector<std::function<void()>> tasks{task};
1191 collectAny(tasks.begin(), tasks.end());
1193 EXPECT_EQ(43, local<Data>().value);
1196 fm.loopUntilNoReady();
1197 EXPECT_FALSE(fm.hasTasks());
1200 TEST(FiberManager, fiberLocal) {
1205 testFiberLocal<SimpleData>();
1208 TEST(FiberManager, fiberLocalHeap) {
1210 char _[1024 * 1024];
1214 testFiberLocal<LargeData>();
1217 TEST(FiberManager, fiberLocalDestructor) {
1224 EXPECT_EQ(42, local<CrazyData>().data);
1225 // Make sure we don't have infinite loop
1226 local<CrazyData>().data = 0;
1233 LocalType<CrazyData>(), folly::make_unique<SimpleLoopController>());
1235 fm.addTask([]() { local<CrazyData>().data = 41; });
1237 fm.loopUntilNoReady();
1238 EXPECT_FALSE(fm.hasTasks());
1241 TEST(FiberManager, yieldTest) {
1242 FiberManager manager(folly::make_unique<SimpleLoopController>());
1243 auto& loopController =
1244 dynamic_cast<SimpleLoopController&>(manager.loopController());
1246 bool checkRan = false;
1248 manager.addTask([&]() {
1253 loopController.loop([&]() {
1255 loopController.stop();
1259 EXPECT_TRUE(checkRan);
1262 TEST(FiberManager, RequestContext) {
1263 FiberManager fm(folly::make_unique<SimpleLoopController>());
1265 bool checkRun1 = false;
1266 bool checkRun2 = false;
1267 bool checkRun3 = false;
1268 bool checkRun4 = false;
1269 folly::fibers::Baton baton1;
1270 folly::fibers::Baton baton2;
1271 folly::fibers::Baton baton3;
1272 folly::fibers::Baton baton4;
1275 folly::RequestContextScopeGuard rctx;
1276 auto rcontext1 = folly::RequestContext::get();
1277 fm.addTask([&, rcontext1]() {
1278 EXPECT_EQ(rcontext1, folly::RequestContext::get());
1280 [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1281 EXPECT_EQ(rcontext1, folly::RequestContext::get());
1283 [&]() { EXPECT_EQ(rcontext1, folly::RequestContext::get()); });
1288 folly::RequestContextScopeGuard rctx;
1289 auto rcontext2 = folly::RequestContext::get();
1290 fm.addTaskRemote([&, rcontext2]() {
1291 EXPECT_EQ(rcontext2, folly::RequestContext::get());
1293 EXPECT_EQ(rcontext2, folly::RequestContext::get());
1298 folly::RequestContextScopeGuard rctx;
1299 auto rcontext3 = folly::RequestContext::get();
1302 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1304 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1306 return folly::Unit();
1308 [&, rcontext3](Try<folly::Unit>&& /* t */) {
1309 EXPECT_EQ(rcontext3, folly::RequestContext::get());
1314 folly::RequestContext::setContext(nullptr);
1316 folly::RequestContextScopeGuard rctx;
1317 auto rcontext4 = folly::RequestContext::get();
1319 EXPECT_EQ(rcontext4, folly::RequestContext::get());
1324 folly::RequestContextScopeGuard rctx;
1325 auto rcontext = folly::RequestContext::get();
1327 fm.loopUntilNoReady();
1328 EXPECT_EQ(rcontext, folly::RequestContext::get());
1331 EXPECT_EQ(rcontext, folly::RequestContext::get());
1332 fm.loopUntilNoReady();
1333 EXPECT_TRUE(checkRun1);
1334 EXPECT_EQ(rcontext, folly::RequestContext::get());
1337 EXPECT_EQ(rcontext, folly::RequestContext::get());
1338 fm.loopUntilNoReady();
1339 EXPECT_TRUE(checkRun2);
1340 EXPECT_EQ(rcontext, folly::RequestContext::get());
1343 EXPECT_EQ(rcontext, folly::RequestContext::get());
1344 fm.loopUntilNoReady();
1345 EXPECT_TRUE(checkRun3);
1346 EXPECT_EQ(rcontext, folly::RequestContext::get());
1349 EXPECT_EQ(rcontext, folly::RequestContext::get());
1350 fm.loopUntilNoReady();
1351 EXPECT_TRUE(checkRun4);
1352 EXPECT_EQ(rcontext, folly::RequestContext::get());
1356 TEST(FiberManager, resizePeriodically) {
1357 FiberManager::Options opts;
1358 opts.fibersPoolResizePeriodMs = 300;
1359 opts.maxFibersPoolSize = 5;
1361 FiberManager manager(folly::make_unique<EventBaseLoopController>(), opts);
1363 folly::EventBase evb;
1364 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1365 .attachEventBase(evb);
1367 std::vector<Baton> batons(10);
1369 size_t tasksRun = 0;
1370 for (size_t i = 0; i < 30; ++i) {
1371 manager.addTask([i, &batons, &tasksRun]() {
1373 // Keep some fibers active indefinitely
1374 if (i < batons.size()) {
1380 EXPECT_EQ(0, tasksRun);
1381 EXPECT_EQ(30, manager.fibersAllocated());
1382 EXPECT_EQ(0, manager.fibersPoolSize());
1385 EXPECT_EQ(30, tasksRun);
1386 EXPECT_EQ(30, manager.fibersAllocated());
1387 // Can go over maxFibersPoolSize, 10 of 30 fibers still active
1388 EXPECT_EQ(20, manager.fibersPoolSize());
1390 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1391 evb.loopOnce(); // no fibers active in this period
1392 EXPECT_EQ(30, manager.fibersAllocated());
1393 EXPECT_EQ(20, manager.fibersPoolSize());
1395 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1396 evb.loopOnce(); // should shrink fibers pool to maxFibersPoolSize
1397 EXPECT_EQ(15, manager.fibersAllocated());
1398 EXPECT_EQ(5, manager.fibersPoolSize());
1400 for (size_t i = 0; i < batons.size(); ++i) {
1404 EXPECT_EQ(15, manager.fibersAllocated());
1405 EXPECT_EQ(15, manager.fibersPoolSize());
1407 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1408 evb.loopOnce(); // 10 fibers active in last period
1409 EXPECT_EQ(10, manager.fibersAllocated());
1410 EXPECT_EQ(10, manager.fibersPoolSize());
1412 std::this_thread::sleep_for(std::chrono::milliseconds(400));
1414 EXPECT_EQ(5, manager.fibersAllocated());
1415 EXPECT_EQ(5, manager.fibersPoolSize());
1418 TEST(FiberManager, batonWaitTimeoutHandler) {
1419 FiberManager manager(folly::make_unique<EventBaseLoopController>());
1421 folly::EventBase evb;
1422 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1423 .attachEventBase(evb);
1425 size_t fibersRun = 0;
1427 Baton::TimeoutHandler timeoutHandler;
1429 manager.addTask([&]() {
1430 baton.wait(timeoutHandler);
1433 manager.loopUntilNoReady();
1435 EXPECT_FALSE(baton.try_wait());
1436 EXPECT_EQ(0, fibersRun);
1438 timeoutHandler.scheduleTimeout(std::chrono::milliseconds(250));
1439 std::this_thread::sleep_for(std::chrono::milliseconds(500));
1441 EXPECT_FALSE(baton.try_wait());
1442 EXPECT_EQ(0, fibersRun);
1445 manager.loopUntilNoReady();
1447 EXPECT_EQ(1, fibersRun);
1450 TEST(FiberManager, batonWaitTimeoutMany) {
1451 FiberManager manager(folly::make_unique<EventBaseLoopController>());
1453 folly::EventBase evb;
1454 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1455 .attachEventBase(evb);
1457 constexpr size_t kNumTimeoutTasks = 10000;
1458 size_t tasksCount = kNumTimeoutTasks;
1460 // We add many tasks to hit timeout queue deallocation logic.
1461 for (size_t i = 0; i < kNumTimeoutTasks; ++i) {
1462 manager.addTask([&]() {
1464 Baton::TimeoutHandler timeoutHandler;
1466 folly::fibers::addTask([&] {
1467 timeoutHandler.scheduleTimeout(std::chrono::milliseconds(1000));
1470 baton.wait(timeoutHandler);
1471 if (--tasksCount == 0) {
1472 evb.terminateLoopSoon();
1480 TEST(FiberManager, remoteFutureTest) {
1481 FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
1482 auto& loopController =
1483 dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1487 auto f1 = fiberManager.addTaskFuture([&]() { return testValue1; });
1488 auto f2 = fiberManager.addTaskRemoteFuture([&]() { return testValue2; });
1489 loopController.loop([&]() { loopController.stop(); });
1493 EXPECT_EQ(v1, testValue1);
1494 EXPECT_EQ(v2, testValue2);
1497 // Test that a void function produes a Future<Unit>.
1498 TEST(FiberManager, remoteFutureVoidUnitTest) {
1499 FiberManager fiberManager(folly::make_unique<SimpleLoopController>());
1500 auto& loopController =
1501 dynamic_cast<SimpleLoopController&>(fiberManager.loopController());
1503 bool ranLocal = false;
1504 folly::Future<folly::Unit> futureLocal =
1505 fiberManager.addTaskFuture([&]() { ranLocal = true; });
1507 bool ranRemote = false;
1508 folly::Future<folly::Unit> futureRemote =
1509 fiberManager.addTaskRemoteFuture([&]() { ranRemote = true; });
1511 loopController.loop([&]() { loopController.stop(); });
1514 ASSERT_TRUE(ranLocal);
1516 futureRemote.wait();
1517 ASSERT_TRUE(ranRemote);
1520 TEST(FiberManager, nestedFiberManagers) {
1521 folly::EventBase outerEvb;
1522 folly::EventBase innerEvb;
1524 getFiberManager(outerEvb).addTask([&]() {
1526 &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1528 runInMainContext([&]() {
1529 getFiberManager(innerEvb).addTask([&]() {
1531 &getFiberManager(innerEvb), FiberManager::getFiberManagerUnsafe());
1533 innerEvb.terminateLoopSoon();
1536 innerEvb.loopForever();
1540 &getFiberManager(outerEvb), FiberManager::getFiberManagerUnsafe());
1542 outerEvb.terminateLoopSoon();
1545 outerEvb.loopForever();
1548 TEST(FiberManager, semaphore) {
1549 constexpr size_t kTasks = 10;
1550 constexpr size_t kIterations = 10000;
1551 constexpr size_t kNumTokens = 10;
1553 Semaphore sem(kNumTokens);
1557 auto task = [&sem, kTasks, kIterations, kNumTokens](
1558 int& counter, folly::fibers::Baton& baton) {
1559 FiberManager manager(folly::make_unique<EventBaseLoopController>());
1560 folly::EventBase evb;
1561 dynamic_cast<EventBaseLoopController&>(manager.loopController())
1562 .attachEventBase(evb);
1565 std::shared_ptr<folly::EventBase> completionCounter(
1566 &evb, [](folly::EventBase* evb) { evb->terminateLoopSoon(); });
1568 for (size_t i = 0; i < kTasks; ++i) {
1569 manager.addTask([&, completionCounter]() {
1570 for (size_t j = 0; j < kIterations; ++j) {
1576 EXPECT_LT(counter, kNumTokens);
1577 EXPECT_GE(counter, 0);
1587 folly::fibers::Baton batonA;
1588 folly::fibers::Baton batonB;
1589 std::thread threadA([&] { task(counterA, batonA); });
1590 std::thread threadB([&] { task(counterB, batonB); });
1597 EXPECT_LT(counterA, kNumTokens);
1598 EXPECT_LT(counterB, kNumTokens);
1599 EXPECT_GE(counterA, 0);
1600 EXPECT_GE(counterB, 0);
1603 template <typename ExecutorT>
1604 void singleBatchDispatch(ExecutorT& executor, int batchSize, int index) {
1605 thread_local BatchDispatcher<int, std::string, ExecutorT> batchDispatcher(
1606 executor, [=](std::vector<int>&& batch) {
1607 EXPECT_EQ(batchSize, batch.size());
1608 std::vector<std::string> results;
1609 for (auto& it : batch) {
1610 results.push_back(folly::to<std::string>(it));
1615 auto indexCopy = index;
1616 auto result = batchDispatcher.add(std::move(indexCopy));
1617 EXPECT_EQ(folly::to<std::string>(index), result.get());
1620 TEST(FiberManager, batchDispatchTest) {
1621 folly::EventBase evb;
1622 auto& executor = getFiberManager(evb);
1624 // Launch multiple fibers with a single id.
1625 executor.add([&]() {
1627 for (int i = 0; i < batchSize; i++) {
1629 [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
1634 // Reuse the same BatchDispatcher to batch once again.
1635 executor.add([&]() {
1637 for (int i = 0; i < batchSize; i++) {
1639 [=, &executor]() { singleBatchDispatch(executor, batchSize, i); });
1645 template <typename ExecutorT>
1646 folly::Future<std::vector<std::string>> doubleBatchInnerDispatch(
1647 ExecutorT& executor,
1648 int totalNumberOfElements,
1649 std::vector<int> input) {
1650 thread_local BatchDispatcher<
1652 std::vector<std::string>,
1654 batchDispatcher(executor, [=](std::vector<std::vector<int>>&& batch) {
1655 std::vector<std::vector<std::string>> results;
1656 int numberOfElements = 0;
1657 for (auto& unit : batch) {
1658 numberOfElements += unit.size();
1659 std::vector<std::string> result;
1660 for (auto& element : unit) {
1661 result.push_back(folly::to<std::string>(element));
1663 results.push_back(std::move(result));
1665 EXPECT_EQ(totalNumberOfElements, numberOfElements);
1669 return batchDispatcher.add(std::move(input));
1673 * Batch values in groups of 5, and then call inner dispatch.
1675 template <typename ExecutorT>
1676 void doubleBatchOuterDispatch(
1677 ExecutorT& executor,
1678 int totalNumberOfElements,
1680 thread_local BatchDispatcher<int, std::string, ExecutorT>
1681 batchDispatcher(executor, [=, &executor](std::vector<int>&& batch) {
1682 EXPECT_EQ(totalNumberOfElements, batch.size());
1683 std::vector<std::string> results;
1684 std::vector<folly::Future<std::vector<std::string>>>
1685 innerDispatchResultFutures;
1687 std::vector<int> group;
1688 for (auto unit : batch) {
1689 group.push_back(unit);
1690 if (group.size() == 5) {
1691 auto localGroup = group;
1694 innerDispatchResultFutures.push_back(doubleBatchInnerDispatch(
1695 executor, totalNumberOfElements, localGroup));
1700 innerDispatchResultFutures.begin(), innerDispatchResultFutures.end())
1702 std::vector<Try<std::vector<std::string>>> innerDispatchResults) {
1703 for (auto& unit : innerDispatchResults) {
1704 for (auto& element : unit.value()) {
1705 results.push_back(element);
1713 auto indexCopy = index;
1714 auto result = batchDispatcher.add(std::move(indexCopy));
1715 EXPECT_EQ(folly::to<std::string>(index), result.get());
1718 TEST(FiberManager, doubleBatchDispatchTest) {
1719 folly::EventBase evb;
1720 auto& executor = getFiberManager(evb);
1722 // Launch multiple fibers with a single id.
1723 executor.add([&]() {
1724 int totalNumberOfElements = 20;
1725 for (int i = 0; i < totalNumberOfElements; i++) {
1726 executor.add([=, &executor]() {
1727 doubleBatchOuterDispatch(executor, totalNumberOfElements, i);
1734 template <typename ExecutorT>
1735 void batchDispatchExceptionHandling(ExecutorT& executor, int i) {
1736 thread_local BatchDispatcher<int, int, ExecutorT> batchDispatcher(
1737 executor, [=, &executor](std::vector<int> &&) -> std::vector<int> {
1738 throw std::runtime_error("Surprise!!");
1741 EXPECT_THROW(batchDispatcher.add(i).get(), std::runtime_error);
1744 TEST(FiberManager, batchDispatchExceptionHandlingTest) {
1745 folly::EventBase evb;
1746 auto& executor = getFiberManager(evb);
1748 // Launch multiple fibers with a single id.
1749 executor.add([&]() {
1750 int totalNumberOfElements = 5;
1751 for (int i = 0; i < totalNumberOfElements; i++) {
1753 [=, &executor]() { batchDispatchExceptionHandling(executor, i); });
1759 namespace AtomicBatchDispatcherTesting {
1761 using ValueT = size_t;
1762 using ResultT = std::string;
1763 using DispatchFunctionT =
1764 folly::Function<std::vector<ResultT>(std::vector<ValueT>&&)>;
1766 #define ENABLE_TRACE_IN_TEST 0 // Set to 1 to debug issues in ABD tests
1767 #if ENABLE_TRACE_IN_TEST
1768 #define OUTPUT_TRACE std::cerr
1769 #else // ENABLE_TRACE_IN_TEST
1770 struct DevNullPiper {
1771 template <typename T>
1772 DevNullPiper& operator<<(const T&) {
1776 DevNullPiper& operator<<(std::ostream& (*)(std::ostream&)) {
1780 #define OUTPUT_TRACE devNullPiper
1781 #endif // ENABLE_TRACE_IN_TEST
1784 AtomicBatchDispatcher<ValueT, ResultT>::Token token;
1787 void preprocess(FiberManager& executor, bool die) {
1788 // Yield for a random duration [0, 10] ms to simulate I/O in preprocessing
1789 clock_t msecToDoIO = folly::Random::rand32() % 10;
1790 double start = (1000.0 * clock()) / CLOCKS_PER_SEC;
1791 double endAfter = start + msecToDoIO;
1792 while ((1000.0 * clock()) / CLOCKS_PER_SEC < endAfter) {
1796 throw std::logic_error("Simulating preprocessing failure");
1800 Job(AtomicBatchDispatcher<ValueT, ResultT>::Token&& t, ValueT i)
1801 : token(std::move(t)), input(i) {}
1803 Job(Job&&) = default;
1804 Job& operator=(Job&&) = default;
1807 ResultT processSingleInput(ValueT&& input) {
1808 return folly::to<ResultT>(std::move(input));
1811 std::vector<ResultT> userDispatchFunc(std::vector<ValueT>&& inputs) {
1812 size_t expectedCount = inputs.size();
1813 std::vector<ResultT> results;
1814 results.reserve(expectedCount);
1815 for (size_t i = 0; i < expectedCount; ++i) {
1816 results.emplace_back(processSingleInput(std::move(inputs[i])));
1822 AtomicBatchDispatcher<ValueT, ResultT>& atomicBatchDispatcher,
1823 std::vector<Job>& jobs,
1826 for (size_t i = 0; i < count; ++i) {
1827 jobs.emplace_back(Job(atomicBatchDispatcher.getToken(), i));
1831 enum class DispatchProblem {
1838 FiberManager& executor,
1839 std::vector<Job>& jobs,
1840 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1841 DispatchProblem dispatchProblem = DispatchProblem::None,
1842 size_t problemIndex = size_t(-1)) {
1844 dispatchProblem == DispatchProblem::None || problemIndex < jobs.size());
1846 results.resize(jobs.size());
1847 for (size_t i = 0; i < jobs.size(); ++i) {
1849 [i, &executor, &jobs, &results, dispatchProblem, problemIndex]() {
1851 Job job(std::move(jobs[i]));
1853 if (dispatchProblem == DispatchProblem::PreprocessThrows) {
1854 if (i == problemIndex) {
1855 EXPECT_THROW(job.preprocess(executor, true), std::logic_error);
1860 job.preprocess(executor, false);
1861 OUTPUT_TRACE << "Dispatching job #" << i << std::endl;
1862 results[i] = job.token.dispatch(job.input);
1863 OUTPUT_TRACE << "Result future filled for job #" << i << std::endl;
1865 if (dispatchProblem == DispatchProblem::DuplicateDispatch) {
1866 if (i == problemIndex) {
1867 EXPECT_THROW(job.token.dispatch(job.input), ABDUsageException);
1871 OUTPUT_TRACE << "Preprocessing failed for job #" << i << std::endl;
1877 void validateResult(
1878 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1881 OUTPUT_TRACE << "results[" << i << "].value() : " << results[i]->value()
1883 } catch (std::exception& e) {
1884 OUTPUT_TRACE << "Exception : " << e.what() << std::endl;
1889 template <typename TException>
1890 void validateResults(
1891 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1892 size_t expectedNumResults) {
1893 size_t numResultsFilled = 0;
1894 for (size_t i = 0; i < results.size(); ++i) {
1899 EXPECT_THROW(validateResult(results, i), TException);
1901 EXPECT_EQ(numResultsFilled, expectedNumResults);
1904 void validateResults(
1905 std::vector<folly::Optional<folly::Future<ResultT>>>& results,
1906 size_t expectedNumResults) {
1907 size_t numResultsFilled = 0;
1908 for (size_t i = 0; i < results.size(); ++i) {
1913 EXPECT_NO_THROW(validateResult(results, i));
1914 ValueT expectedInput = i;
1916 results[i]->value(), processSingleInput(std::move(expectedInput)));
1918 EXPECT_EQ(numResultsFilled, expectedNumResults);
1921 } // AtomicBatchDispatcherTesting
1923 #define SET_UP_TEST_FUNC \
1924 using namespace AtomicBatchDispatcherTesting; \
1925 folly::EventBase evb; \
1926 auto& executor = getFiberManager(evb); \
1927 const size_t COUNT = 11; \
1928 std::vector<Job> jobs; \
1929 jobs.reserve(COUNT); \
1930 std::vector<folly::Optional<folly::Future<ResultT>>> results; \
1931 results.reserve(COUNT); \
1932 DispatchFunctionT dispatchFunc
1934 TEST(FiberManager, ABD_Test) {
1938 // Testing AtomicBatchDispatcher with explicit call to commit()
1940 dispatchFunc = userDispatchFunc;
1941 auto atomicBatchDispatcher =
1942 createAtomicBatchDispatcher(std::move(dispatchFunc));
1943 createJobs(atomicBatchDispatcher, jobs, COUNT);
1944 dispatchJobs(executor, jobs, results);
1945 atomicBatchDispatcher.commit();
1947 validateResults(results, COUNT);
1950 TEST(FiberManager, ABD_DispatcherDestroyedBeforeCallingCommit) {
1954 // Testing AtomicBatchDispatcher destroyed before calling commit.
1955 // Handles error cases for:
1956 // - User might have forgotten to add the call to commit() in the code
1957 // - An unexpected exception got thrown in user code before commit() is called
1960 dispatchFunc = userDispatchFunc;
1961 auto atomicBatchDispatcher =
1962 createAtomicBatchDispatcher(std::move(dispatchFunc));
1963 createJobs(atomicBatchDispatcher, jobs, COUNT);
1964 dispatchJobs(executor, jobs, results);
1965 throw std::runtime_error(
1966 "Unexpected exception in user code before commit called");
1967 // atomicBatchDispatcher.commit();
1969 /* User code handles the exception and does not exit process */
1972 validateResults<ABDCommitNotCalledException>(results, COUNT);
1975 TEST(FiberManager, ABD_PreprocessingFailureTest) {
1979 // Testing preprocessing failure on a job throws
1981 dispatchFunc = userDispatchFunc;
1982 auto atomicBatchDispatcher =
1983 createAtomicBatchDispatcher(std::move(dispatchFunc));
1984 createJobs(atomicBatchDispatcher, jobs, COUNT);
1985 dispatchJobs(executor, jobs, results, DispatchProblem::PreprocessThrows, 8);
1986 atomicBatchDispatcher.commit();
1988 validateResults<ABDTokenNotDispatchedException>(results, COUNT - 1);
1991 TEST(FiberManager, ABD_MultipleDispatchOnSameTokenErrorTest) {
1995 // Testing that calling dispatch more than once on the same token throws
1997 dispatchFunc = userDispatchFunc;
1998 auto atomicBatchDispatcher =
1999 createAtomicBatchDispatcher(std::move(dispatchFunc));
2000 createJobs(atomicBatchDispatcher, jobs, COUNT);
2001 dispatchJobs(executor, jobs, results, DispatchProblem::DuplicateDispatch, 4);
2002 atomicBatchDispatcher.commit();
2006 TEST(FiberManager, ABD_GetTokenCalledAfterCommitTest) {
2010 // Testing that exception set on attempt to call getToken after commit called
2012 dispatchFunc = userDispatchFunc;
2013 auto atomicBatchDispatcher =
2014 createAtomicBatchDispatcher(std::move(dispatchFunc));
2015 createJobs(atomicBatchDispatcher, jobs, COUNT);
2016 atomicBatchDispatcher.commit();
2017 EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
2018 dispatchJobs(executor, jobs, results);
2019 EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
2021 validateResults(results, COUNT);
2022 EXPECT_THROW(atomicBatchDispatcher.getToken(), ABDUsageException);
2025 TEST(FiberManager, ABD_UserProvidedBatchDispatchThrowsTest) {
2029 // Testing that exception is set if user provided batch dispatch throws
2031 dispatchFunc = [](std::vector<ValueT>&& inputs) -> std::vector<ResultT> {
2032 (void)userDispatchFunc(std::move(inputs));
2033 throw std::runtime_error("Unexpected exception in user dispatch function");
2035 auto atomicBatchDispatcher =
2036 createAtomicBatchDispatcher(std::move(dispatchFunc));
2037 createJobs(atomicBatchDispatcher, jobs, COUNT);
2038 dispatchJobs(executor, jobs, results);
2039 atomicBatchDispatcher.commit();
2041 validateResults<std::runtime_error>(results, COUNT);
2044 TEST(FiberManager, VirtualEventBase) {
2045 folly::ScopedEventBaseThread thread;
2048 folly::make_unique<folly::VirtualEventBase>(*thread.getEventBase());
2050 folly::make_unique<folly::VirtualEventBase>(*thread.getEventBase());
2055 getFiberManager(*evb1).addTaskRemote([&] {
2057 baton.timed_wait(std::chrono::milliseconds{100});
2062 getFiberManager(*evb2).addTaskRemote([&] {
2064 baton.timed_wait(std::chrono::milliseconds{200});
2076 TEST(TimedMutex, ThreadFiberDeadlockOrder) {
2077 folly::EventBase evb;
2078 auto& fm = getFiberManager(evb);
2082 std::thread unlockThread([&] {
2083 /* sleep override */ std::this_thread::sleep_for(
2084 std::chrono::milliseconds{100});
2088 fm.addTask([&] { std::lock_guard<TimedMutex> lg(mutex); });
2090 runInMainContext([&] {
2091 auto locked = mutex.timed_lock(std::chrono::seconds{1});
2092 EXPECT_TRUE(locked);
2100 EXPECT_EQ(0, fm.hasTasks());
2102 unlockThread.join();
2105 TEST(TimedMutex, ThreadFiberDeadlockRace) {
2106 folly::EventBase evb;
2107 auto& fm = getFiberManager(evb);
2113 auto locked = mutex.timed_lock(std::chrono::seconds{1});
2114 EXPECT_TRUE(locked);
2121 runInMainContext([&] {
2122 auto locked = mutex.timed_lock(std::chrono::seconds{1});
2123 EXPECT_TRUE(locked);
2131 EXPECT_EQ(0, fm.hasTasks());
2135 * Test that we can properly track fiber stack usage.
2137 * This functionality can only be enabled when ASAN is disabled, so avoid
2138 * running this test with ASAN.
2140 #ifndef FOLLY_SANITIZE_ADDRESS
2141 TEST(FiberManager, recordStack) {
2143 folly::fibers::FiberManager::Options opts;
2144 opts.recordStackEvery = 1;
2146 FiberManager fm(folly::make_unique<SimpleLoopController>(), opts);
2147 auto& loopController =
2148 dynamic_cast<SimpleLoopController&>(fm.loopController());
2150 static constexpr size_t n = 1000;
2154 for (size_t i = 0; i < n; ++i) {
2157 for (size_t i = 0; i + 1 < n; ++i) {
2158 s += b[i] * b[i + 1];
2164 loopController.loop([&]() { loopController.stop(); });
2166 // Check that we properly accounted fiber stack usage.
2167 EXPECT_LT(n * sizeof(int), fm.stackHighWatermark());